This chapter is taken from book "Introducing
Microsoft SQL Server 2008 R2" by Ross Mistry and Stacia Misner published for
Microsoft Press.
Microsoft SQL Server StreamInsight is a complex event processing (CEP) engine.
This technology is a new offering in the SQL Server family, making its first
appearance in SQL Server 2008 R2. It ships with the Standard, Enterprise, and
Datacenter editions of SQL Server 2008 R2. StreamInsight is both an engine built
to process high-throughput streams of data with low latency and a Microsoft .NET
Framework platform for developers of CEP applications. The goal of a CEP
application is to rapidly aggregate high volumes of raw data for analysis as it
streams from point to point. You can apply analytical techniques to trigger a
response upon crossing a threshold or to find trends or exceptions in the data
without first storing it in a data warehouse.
Complex Event Processing
Complex event processing is the task of sifting through streaming data to find
meaningful information. It might involve performing calculations on the data to
derive information, or the information might be the revelation of significant
trends. As a development platform, StreamInsight can support most types of CEP
applications that you might need.
Complex Event Processing Applications
There are certain industries that regularly produce high volumes of streaming
data. Manufacturing and utilities companies use sensors, meters, and other
devices to monitor processes and alert users when the system identifies events
that could lead to a potential failure. Financial trading firms must monitor
market prices for stocks, commodities, and other financial instruments and
rapidly calculate profits or losses based on changing conditions.
Similarly, there are certain types of applications that benefit from the ability
to analyze data as close as possible to the time that the applications capture
the data. For example, companies selling products online often use clickstream
analysis to change the page layout and site navigation and to display targeted
advertising while a user remains connected to a site. Credit card companies
monitor transactions for exceptions to normal spending activities that could
indicate fraud.
The challenge with CEP arises when you need to process and analyze the data
before you have time to perform ETL activities to move the data into a more
traditional analytical environment, such as a data warehouse. In CEP
applications, the value of the information derived from low-latency processing,
defined in milliseconds, can be extremely high. This value begins to diminish as
the data ages. Adding to the challenge is the rate at which source applications
generate data, often tens of thousands of records per second.
StreamInsight Highlights
StreamInsight's CEP server includes a core engine that is built to process
high-throughput data. The engine achieves high performance by executing highly
parallel queries and using in-memory caches to avoid incurring the overhead of
storing data for processing. The engine can handle data that arrives at a steady
rate or in intermittent bursts, and can even rearrange data that arrives out of
sequence. Queries can also incorporate nonstreaming data sources, such as master
reference data or historical data maintained in a data warehouse.
You write your CEP applications using a .NET language, such as Visual Basic or
C#, for rapid application development. In your applications, you embed
declarative queries using Language Integrated Query (LINQ) expressions to
process the data for analysis.
StreamInsight also includes other tools for administration and development
support. The CEP server has a management interface and diagnostic views that you
can use to develop applications to monitor StreamInsight. For development
support, StreamInsight includes an event flow debugger that you can use to
troubleshoot queries. An example of a situation that might require
troubleshooting is the arrival of a larger number of events than expected.
StreamInsight Architecture
As with any new technology, you will find it helpful to have an understanding of
the Stream- Insight architecture before you begin development of your first CEP
application. Your application must restructure data streams to a format usable
by the processing engine. You use adapters to perform this restructuring before
passing the data to queries that run on the CEP server. The way you choose to
develop your application also depends on the deployment model you use to
implement StreamInsight.
Data Structures
The high-throughput data that StreamInsight requires is known as a stream. More
specifically, a stream is a collection of data that changes over time. For
example, a Web log contains data about each server hit, including the date,
time, page request, and Internet protocol (IP) address of the visitor. If a
visitor clicks on several pages in the Web site, the Web log contains multiple
lines, or hits, for the same visitor, and each line records a different time.
The information in the Web log shows how each user's activity in a Web site
changes over time, which is why this type of information is considered a stream.
You can query this stream to find the average number of hits or the top five
referring sites over time.
StreamInsight splits a stream into individual units called events. An event
contains a header and a payload. The event header includes the event kind and
one or more timestamps for the event. The event kind is an indicator of a new
event or the completeness of events already in the stream. The payload contains
the event's data as a .NET data structure.
There are three types of event models that StreamInsight uses. The interval
event model represents events with a fixed duration, such as a stock bid price
that is valid only for a certain period of time. The edge event model is another
type of duration model, but it represents an event with a duration that is
unknown at the time the event starts, such as a Web user session. The point
model represents events that occur at a specific point in time, such as a Web
user's click entry in a Web log.
The CEP Server
The CEP server is a run-time engine and a set of adapter instances that receive
and send events, as shown in Figure 8-1. You develop these adapters in a .NET
language and register the assemblies on the CEP server, which then instantiates
the adapters at run time. Input adapters receive data as a continuous stream
from event stores, such as sensors on a factory floor, Web servers, data feeds,
or databases. The data passes from the input adapter to the CEP engine, which
processes and transforms the data by using standing queries, which are query
instances that the CEP engine manages. The engine then forwards the query
results to output adapters, which connect to event consumers, such as pagers,
monitoring devices, dashboards, and databases. The output adapters can also
include logic to trigger a response based on the query results.
FIGURE 8-1 StreamInsight architecture
Input Adapters
The input adapters translate the incoming events into the event format that the
CEP engine requires. You can create a typed adapter if the source produces a
single event type only, but you must create an untyped adapter when the payload
format differs across events or is unknown in advance. In the case of the typed
adapter, the payload format is defined in advance with a static number of fields
and data types when you implement the adapter. By contrast, an untyped adapter
receives the payload format only when the adapter binds to the query (as part of
a configuration specification). In the latter case, the number of fields and
data types can vary with each query instantiation.
Output Adapters
The output adapters reverse the operations of the input adapters by translating
events into a format that is usable by the target device and then sending the
translated data to the device. The development process for an output adapter is
very similar to the process you use to develop an input adapter.
Query Instances
Standing queries receive the stream of data from an input adapter, apply
business logic to the data (such as an aggregation), and send the results as an
event stream to an output adapter. You encapsulate the business logic used by a
standing query instance in a query template that you develop using a combination
of LINQ and a .NET language. To create the standing query instance in the CEP
server, you bind a query template with specific input and output. You can use
the same query template with multiple standing queries. After you instantiate a
query, you are can start, stop, or manage it.
Deployment Models
You have two options for deploying StreamInsight. You can integrate the CEP
server into an application as a hosted assembly, or you can deploy it as a
standalone server.
Hosted Assembly
Embedding the CEP server into a host application is a simple deployment
approach. You have greater flexibility than you would have with a standalone
server because there are no dependencies between applications that you must
consider before making changes. Each application and the CEP server run as a
single process, which may be easier to manage on your server.
You can use any of the development approaches described later, in the
"Application Development" section of this chapter, when hosting the CEP server
in your application. However, if you decide later that you want your application
to run on a standalone server, you will need to rewrite your application using
the explicit server development model.
Standalone Server
You should deploy the CEP server as a standalone server when applications need
to share event streams or metadata objects. For example, you can reuse event
types, adapter types, and query templates and thereby minimize the impact of
changes to any of these metadata objects across applications by maintaining a
single copy. You can run the CEP server as an executable, or you can configure
it as a Windows service. If you want to run it as a service application, you can
use StreamInsightHost.exe as a host process or develop your own host process.
If you choose to deploy CEP as a standalone server, there are some limitations
that affect the way you develop applications. First, you can use only the
explicit server development model (which is described in the next section of
this chapter) when developing CEP applications for a standalone server. Second,
you must connect to the CEP server by using the Web service Uniform Resource
Identifier (URI) of the CEP server host process.
Application Development
You start the typical development cycle for a new CEP application by sampling
the existing data streams and developing functions to process the data. You then
test the functions, review the results, and determine the changes necessary to
improve the functions. This process continues in an iterative fashion until you
complete development.
As part of the development of your CEP application, you create event types,
adapters, and query templates. The way you use these objects depends on the
development model you choose. When you develop using the explicit server
development model, you explicitly create and register all of these objects and
can reuse these objects in multiple applications. In the implicit server
development model, you concentrate on the development of the query logic and
rely on the CEP server to act as an implicit host and to create and register the
necessary objects.
TIP You can locate and download sample applications by searching for
StreamInsight at CodePlex (http://www.codeplex.com).
Event Types
An event type defines events published by the event source or consumed by the
event consumer. You use event types with a typed adapter or as objects in LINQ
expressions that you use in query templates. You create an event type as a .NET
Framework class or structure by using only public fields and properties as the
payload fields, like this:
public class
sampleEvent
{
public
string eventId { get;
set; }
public
double eventValue { get;
set; }
}
An event type can have no more than 32 payload fields. Payload fields must be
only scalar or elementary CLR types. You can use nullable types, such as int?
instead of int. The string and byte[] types are always nullable.
You do not create an event type when your application uses untyped adapters for
scenarios that must support multiple event types. For example, an input adapter
for tables in a SQL
Server database must adapt to the schema of the table that it queries. Instead,
you provide the table schema in a configuration specification when the adapter
is bound to the query. Conversely, an untyped output adapter receives the event
type description, which contains a list of fields, when the query starts. The
untyped output adapter must then map the event type to the schema of the
destination data source, typically in a configuration specification.
Adapters
Input and output adapters provide transformation interfaces between event
sources, event consumers, and the CEP server. Event sources can push events to
event consumers, or event consumers can pull events from event sources. Either
way, the CEP application operates between these two points and intercepts the
events for processing. The input adapter reads events from the source,
transforms them into a format recognizable by the CEP server, and provides the
transformed events to a standing query. As the CEP server processes the event
stream, the output adapter receives the resulting new events, transforms them
for the event consumers, and then delivers the transformed events.
Before you can begin developing an adapter, you must know whether you are
building an input or output adapter. You must also know the event type, which in
this context means you must understand the structure of the event payload and
how the application timestamps affect stream processing. The .NET class or
structure of the event type provides you with information about the event
payload if you are building a typed adapter. The information necessary for the
management of stream processing, known as event metadata, comes from an
interface in the adapter API when it creates an event. In addition to knowing
the event payload and event metadata, you must also know whether the shape of
the event is a point, interval, or edge model. Having this information available
allows you to choose the applicable base class. The adapter base classes are
listed in Table 8-1.
TABLE 8-1 Adapter base classes
If you are developing an untyped input adapter, you must ensure that it can use
the configuration specification during query bind time to determine the event's
field types by inference from the query's SELECT statement. You must also add
code to the adapter to populate
the fields one at a time and enqueue the event. The untyped output adapter works
similarly, but instead it must be able to use the configuration specification to
retrieve query processing results from a dequeued event.
The next step is to develop an AdapterFactory object as a container class for
your input and output adapters. You use an AdapterFactory object to share
resources between adapter implementations and to pass configuration parameters
to adapter constructors. Recall that an untyped adapter relies on the
configuration specification to properly handle an event's payload structure. The
adapter factory must implement the Create() and Dispose() methods as shown in
the following code example, which shows how to create adapters for events in a
text file:
public class
TextFileInputFactory : IInputAdapterFactory<TextFileInputConfig>
{
public InputAdapterBase
Create(TextFileInputConfig configInfo,
EventShape eventShape, CepEventType cepEventType)
{
InputAdapterBase adapter = default(InputAdapterBase);
if (eventShape ==
EventShape.Point)
{
adapter = new
TextFilePointInput(configInfo, cepEventType);
}
else if
(eventShape == EventShape.Interval)
{
adapter = new
TextFileIntervalInput(configInfo, cepEventType);
}
else if
(eventShape == EventShape.Edge)
{
adapter = new
TextFileEdgeInput(configInfo, cepEventType);
}
else
{
throw
new
ArgumentException(
string.Format(CultureInfo.InvariantCulture,
"TextFileInputFactory cannot
instantiate adapter with event shape {0}",
eventShape.ToString()));
}
return adapter;
}
public void
Dispose()
{
}
}
The final step is to create a .NET assembly for the adapter. At minimum, the
adapter includes a constructor, a Start() method, a Resume() method, and either
a ProduceEvents() or ConsumeEvents() method, depending on whether you are
developing an input adapter or an output adapter. You can see the general
structure of the adapter class in the following code example:
public class
TextFilePointInput : PointInputAdapter
{
public
TextFilePointInput(TextFileInputConfig configInfo,
CepEventType cepEventType)
{ ... }
public
override void Start()
{ ... }
public
override void Resume()
{ ... }
private
void ProduceEvents()
{ ... }
}
Using the constructor method for an untyped adapter, such as TextFilePointInput
as in the example, you can pass the configuration parameters from the adapter
factory and the event type object that passes from the query binding. The
constructor also includes code to connect to the event source and to map fields
to the event payload. After the CEP server instantiates the adapter, it invokes
the Start() method, which generally calls the ProduceEvents() or
ConsumeEvents()method to begin receiving streams. The Resume() method invokes
the ProduceEvents() or ConsumeEvents() method again if the CEP server paused the
streaming and confirms that the adapter is ready.
The core transformation and queuing of events occurs in the ProduceEvents()
method. This method iterates through either reading the events it is receiving
from the source or writing events it is sending to the event consumer. It makes
calls as necessary to push or pull events into or from the event stream using
calls to Enqueue() or Dequeue(). Calls to Enqueue() and Dequeue() return the
state of the adapter. If Enqueue() returns FULL or Dequeue() returns EMPTY, the
adapter transitions to a suspended state and can no longer produce or consume
events. When the adapter is ready to resume, it calls Ready(), which then causes
the server to call Resume(), and the cycle of enqueuing and dequeuing begins
again from the point in time at which the adapter was suspended.
Another task the adapter must perform is classification of an event. That is,
the adapter must specify the event kind as either INSERT or Current Time
Increment (CTI). The adapter adds events with the INSERT event kind to the
stream as it receives data from the source. It uses the CTI event kind to ignore
any additional INSERT events it receives afterward that have a start time
earlier than the timestamp of the CTI event.
Query Templates
Query templates encapsulate the business logic that the CEP server instantiates
as a standing query instance to process, filter, and aggregate event streams. To
define a query template, you first create an event stream object. In a
standalone server environment, you can create and register a query template as
an object on the CEP server for reuse.
The Event Stream Object
You can create an event stream object from an unbound stream or a user-defined
input adapter factory.
You might want to develop a query template to register on the CEP server without
binding it to an adapter. In this case, you can use the Create() method of the
EventStream class to obtain an event stream that has a defined shape, but
without binding information. To do this, you can adapt the following code:
CepStream<PayloadType>
inputStream = CepStream<PayloadType>.Create("inputStream");
If you are using the implicit server development model, you can create an event
stream object from an input adapter factory and an input configuration. With
this approach, you do not need to implement an adapter, but you must specify the
event shape. The following example illustrates the syntax to use:
CEPStream<PayloadType>
inputStream =
CepStream<PayloadType>.Create(streamName, typeof(AdapterFactory),
myConfig,
EventShape.Point);
The QueryTemplate Object
When you use the explicit server development model for standalone server
deployment, you can create a QueryTemplate object that you can reuse in multiple
bindings with different input and output adapters. To create a QueryTemplate
object, you use code similar to the following example:
QueryTemplate myQueryTemplate = application.CreateQueryTemplate("myQueryTemplate",
outputStream);
Queries
After you create an event stream object, you write a LINQ expression on top of
the event stream object. You use LINQ expressions to define the fields for
output events, to filter events before query processing, to group events into
subsets, and to perform calculations, aggregations, and ranking. You can even
use LINQ expressions to combine events from multiple streams through join or
union operations. Think of LINQ expressions as the questions you ask of the
streaming data.
Projection
The projection operation, which occurs in the select clause of the LINQ
expression, allows you to add more fields to the payload or apply calculations
to the input event fields. You then project the results into a new event by
using field assignments. You can create a new event type implicitly in the
expressions, or you can refer to an existing event type explicitly. Consider an
example in which you need to increment the fields x and y from every event in
the inputStream stream by one. The following code example shows how to use field
assignments to implicitly define a new event type by using projection:
var outputStream = from
e in inputStream
select new
{ x = e.x + 1, y = e.y + 1 };
To refer to an existing event type, you cannot
use the type's constructor; you must use field assignments in an expression. For
example, assume you have an existing event type called myEventType. You can
change the previous code example as shown here to reference the event type
explicitly:
var outputStream = from
e in inputStream
select new
myEventType { x = e.x + 1, y = e.y + 1 };
Filtering
You use a filtering operation on a stream when you want to apply operations to a
subset of events and discard all other events. All events for which the
expression in the where clause evaluates as true pass to the output stream. In
the following example, the query selects events where the value in field x
equals 5:
var outputStream = from
e in inputStream
where e.x == 5
select e;
Event Windows
A window represents a subset of data from an event stream for a period of time.
After you create a stream of windows, you can perform aggregation, TopK (a LINQ
operation described later in this chapter), or user-defined operations on the
events that the windows contain. For example, you can count the number of events
in each window.
You might be inclined to think of a window as a way to partition the event
stream by time. However, the analogy between a window and a partition is useful
only up to a point. When you partition records in a table, a record belongs to
one and only one partition, but an event can appear in multiple windows based on
its start time and end time. That is, the window that covers the time period
that includes an event's start time might not include the event's end time. In
that case, the event appears in each subsequent window, with the final window
covering the period that includes the event's end time. Therefore, you should
instead think of a window as a way to partition time that is useful for
performing operations on events occurring between the two points of time that
define a window.
In Figure 8-2, each unlabeled box below the input stream represents a window and
contains multiple events for the period of time that the window covers. In this
example, the input stream contains three events, but the first three windows
contain two events and the last window contains only one event. Thus, a count
aggregation on each window yields results different from a count aggregation on
an input stream.
FIGURE 8-2 Event windows in an input stream
As you might guess, the key to working with windows is to have a clear
understanding of the time span that each window covers. There are three types of
window streams that Stream- Insight supports—hopping windows, snapshot windows,
and count windows. In a hopping windows stream, each window spans an equal time
period. In a snapshot windows stream, the size of a window depends on the events
that it contains. By contrast, the size of a count windows stream is not fixed,
but varies according to a specified number of consecutive event start times.
To create a hopping window, you specify both the time span that the window
covers (also known as window size) and the time span between the start of one
window and the start of the next window (also known as hop size). For example,
assume that you need to create windows that cover a period of one hour, and a
new window starts every 15 minutes, as shown in Figure 8-3. In this case, the
window size is one hour and the hop size is 15 minutes. Here is the code to
create a hopping windows stream and count the events in each window:
var outputStream = from
eventWindow in
inputStream.HoppingWindow(TimeSpan.FromHours(1),
TimeSpan.FromMinutes(15))
select
new { count = eventWindow.Count() };
FIGURE 8-3 Hopping windows
When there are no gaps and there is no overlap between the windows in the
stream, hopping windows are also called tumbling windows. Figure 8-2, shown
earlier, provides an example of tumbling windows. The window size and hop size
are the same in a tumbling
windows stream. Although you can use the HoppingWindow method to create tumbling
windows, there is a TumblingWindow method. The following code illustrates how to
count events in tumbling windows that occur every half hour.
var outputStream = from
eventWindow in
inputStream.TumblingWindow(TimeSpan.FromMinutes(30))
select
new { count = eventWindow.Count() };
Snapshot windows are similar to tumbling windows in that the windows do not
overlap, but whereas fixed points in time determine the boundaries of a tumbling
window, events define the boundaries of a snapshot window. Consider the example
in Figure 8-4. At the start of the first event, a new snapshot window starts.
That window ends when the second event starts, and a second snapshot window
starts and includes both the first and second event. When the first event ends,
the second snapshot also ends, and a third snapshot window starts. Thus, the
start and stop of an event triggers the start and stop of a window. Because
events determine the size of the window, the Snapshot method takes arguments, as
shown in the following code, which counts events in each window:
var outputStream = from
eventWindow in inputStream.Snapshot()
select
new { count = eventWindow.Count() };
FIGURE 8-4 Snapshot windows
Count windows are completely different from the other window types because the
size of the windows is variable. When you create windows, you provide a
parameter n as a count of events to fulfill within a window. For example, assume
n is 2 as shown in Figure 8-5. The first window starts when the first event
starts and ends when the second event starts, because a count of 2 events
fulfills the specification. The second event also resets the counter to 1 and
starts a new window. The third event increments the counter to 2, which ends the
second window.
FIGURE 8-5 Count windows
Aggregations
You cannot perform aggregation operations on event streams directly; instead you
must first create a window to group data into periods of time that you can then
aggregate. You then create an aggregation as a method of the window and, for all
aggregations except Count, use a lambda expression to assign the result to a
field.
StreamInsight supports the following aggregation functions:
Assume you want to apply the Sum and Avg
aggregations to field x in an input stream. The following example shows you how
to use these aggregations as well as the Count aggregation for each snapshot
window:
var outputStream = from
eventWindow in inputStream.Snapshot()
select
new
{
sum = eventWindow.Sum(e => e.x),
avg = eventWindow.Avg(e => e.x),
count = eventWindow.Count()
};
TopK
A special type of aggregation is the TopK operation, which you use to rank and
filter events in an ordered window stream. To order a window stream, you use the
orderby clause. Then you use the Take method to specify the number of events
that you want to send to the output stream, discarding all other events. The
following code shows how to produce a stream of the top three events:
var outputStream = (from
eventWindow in inputStream.Snapshot()
from e
in eventWindow
orderby e.x
ascending, e.y
descending
select
e).Take(3);
When you need to include the rank in the output stream, you use projection to
add the rank to each event's payload. This is accessible through the Payload
property, as shown in the following code:
var outputStream = (from
eventWindow in inputStream.Snapshot()
from e
in eventWindow
orderby e.x
ascending, e.y
descending
select
e).Take(3, e => new { x = e.Payload.x, y =
e.Payload.y, rank = e.Rank });
Grouping
When you want to compute operations on event groups separately, you add a group
by clause. For example, you might want to produce an output stream that
aggregates the input stream by location and compute the average for field x for
each location. In the following example, the code illustrates how to create the
grouping by location and how to aggregate events over a specified column:
var outputStream = from
e in inputStream
group e
by e.locationID into
eachLocation
from eventWindow
in eachLocation.Snapshot()
select
new { avgValue = eventWindow.Avg(e => e.x),
locationId = eachGroup.Key };
Joins
You can use a join operation to match events from two streams. The CEP server
first matches events only if they have overlapping time intervals, and then
applies the conditions that you specify in the join predicate. The output of a
join operation is a new event that combines payloads from the two matched
events. Here is the code to join events from two input streams, where field x is
the same value in each event. This code creates a new event containing fields x
and y from the first event and field y from the second event.
var outputStream = from
e1 in inputStream1
join e2
in inputStream2
on e1.x
equals e2.x
select
new { e1.x, e1.y, e2.y };
Another option is to use a cross join, which combines all events in the first
input stream with all events in the second input stream. You specify a cross
join by using a from clause for each input stream and then creating a new event
that includes fields from the events in each stream. By adding a where clause,
you can filter the events in each stream before the CEP server performs the
cross join. The following example selects events with a value for field x
greater than 5 from the first stream and selects events with a value for field y
less than 20 from the second stream, performs the cross join, and then creates a
stream of new events containing field x from the first event and field y from
the second event:
var outputStream = from
e1 in inputStream1
from e2
in inputStream2
where e1.x > 5 &&
e2.y < 20
select
new { e1.x, e2.y };
Unions
You can also combine events from multiple streams by performing a union
operation. You can work with only two streams at a time, but you can cascade a
series of union operations if you need to combine events from three or more
streams, as shown in the following code:
var outputStreamTemp =
inputStream1.Union(inputStream2);
var outputStream =
outputStreamTemp.Union(inputStream3);
User-defined Functions
When you need to perform an operation that the CEP server does not natively
support, you can create user-defined functions (UDFs) by reusing existing .NET
functions. You add a UDF to the CEP server in the same way that you add an
adapter. You can then call the UDF anywhere in your query where an expression
can be used, such as in a filter predicate, a join predicate, or a projection.
Query Template Binding
The method that the CEP server uses to instantiate the query template as a
standing query depends on the development model that you use. If you are using
the explicit server development model, you create a query binder object, but you
create an event stream consumer object if you are using the implicit server
development model.
The Query Binder Object
In the explicit server development model, you first create explicit input and
output adapter objects. Next you create a query binder object as a wrapper for
the query template object on the CEP server, which in turn you bind to the input
and output adapters, and then you call the CreateQuery() method to create the
standing query, as shown here:
QueryBinder
myQuerybinder = new QueryBinder(myQueryTemplate);
myQuerybinder.BindProducer("querySource",
myInputAdapter, inputConf,
EventShape.Point);
myQuerybinder.AddConsumer("queryResult",
myOutputAdapter, outputConf,
EventShape.Point, StreamEventOrder.FullyOrdered);
Query myQuery = application.CreateQuery("query",
myQuerybinder, "query description");
Rather than enqueuing CTIs in the input adapter code, you can define the CTI
behavior by using the AdvanceTimeSettings class as an optional parameter in the
BindProducer method. For example, to send a CTI after every 10 events, set the
CTI's timestamp as the most recent event's timestamp, and drop any event that
appears later in the stream but has an end timestamp earlier than the CTI, use
the following code:
var ats = new
AdvanceTimeSettings(10, TimeSpan.FromSeconds(0),
AdvanceTimePolicy.Drop);
queryBinder.BindProducer("querysource",
myInputAdapter, inputConf,
EventShape.Interval, ats);
The Event Stream Consumer Object
After you define the query logic in an application that uses the implicit server
development model, you can use the output adapter factory to create an event
stream consumer object. You can pass this object directly to the
CepStream.ToQuery() method without binding the query template to the output
adapter, as you can see in the following example:
Query
myQuery = outputStream.ToQuery<ResultType>(typeof(MyOutputAdapterFactory),
outputConf, EventShape.Interval, StreamEventOrder.FullyOrdered);
The Query Object
In both the explicit and implicit development models, you create a query object.
With that object instantiated, you can use the Start() and Stop() methods. The
Start() method instantiates the adapters using the adapter factories, starts the
event processing engine, and calls the Start() methods for each adapter. The
Stop() method sends a message to the adapters that the query is stopping and
then shuts down the query. Your application must include the following code to
start and stop the query object:
query.Start();
// wait for signal to complete the query
query.Stop();
The Management Interface
StreamInsight includes the ManagementService API, which you can use to create
diagnostic views for monitoring the CEP server's resources and the queries
running on the server. Another option is to use Windows PowerShell to access
diagnostic information.
Diagnostic Views
Your diagnostic application can retrieve static information, such as object
property values, and statistical information, such as a cumulative event count
after a particular point in time or an aggregate count of events from child
objects. Objects include the server, input and output adapters, query operators,
schedulers, and event streams. You can retrieve the desired information by using
the GetDiagnosticView() method and passing the object's URI as a method
argument.
If you are monitoring queries, you should understand the transition points at
which the server records metrics about events in a stream. The name of a query
metric identifies the transition point to which the metric applies. For example,
Total Outgoing Event Count provides the total number of events that the output
adapter has dequeued from the engine. The following four transition points
relate to query metrics:
- Incoming The event arrival at the input
adapter
- Consumed The point at which the input
adapter enqueues the event into the engine
- Produced The point at which the event
leaves the last query operator in the engine
- Outgoing The event departure from the
output adapter
Windows PowerShell Diagnostics
For quick analysis, you can use Windows PowerShell scripts to view diagnostic
information rather than writing a complete diagnostic application. Before you
can use a Windows PowerShell script, the StreamInsight server must be running a
query. If the server is running as a hosted assembly, you must expose the Web
service.
You start the diagnostic process by loading the Microsoft.ComplexEventProcessing
assembly from the Global Assembly Cache (GAC) into Windows PowerShell by using
the following code:
PS C:\>
[System.Reflection.Assembly]::LoadWithPartialName("Microsoft.ComplexEventProcessing")
Then you need to create a connection to the StreamInsight host process by using
the code in this example:
PS C:\> $server =
Microsoft.ComplexEventProcessing.Server]::Connect("http://localhost/StreamInsight")
Then you can use the GetDiagnosticView() method to retrieve statistics for an
object, such as the Event Manager, as shown in the following code:
PS C:\> $dv = $server.GetDiagnosticView("cep:/Server/EventManager")
PS C:\> $dv
To retrieve information about a query, you must provide the full name, following
the StreamInsight hierarchical naming schema. For example, for an application
named myApplication with a query named myQuery, you use the following code:
PS C:\> $dv =
$server.GetDiagnosticView("cep:/Server/Application/myApplication/Query/myQuery")
PS C:\> $dv
NOTE For a complete list of metrics and statistics that you can query by
using diagnostic views, refer to the SQL Server Books Online topic "Monitoring
the CEP Server and Queries" at
http://msdn.microsoft.com/en-us/library/ee391166(SQL.105).aspx.