1+ using System ;
2+ using System . Threading ;
3+ using System . Threading . Tasks ;
4+ using System . Management . Automation ;
5+ using System . Collections . Concurrent ;
6+ using System . Runtime . ExceptionServices ;
7+ using Microsoft . SqlServer . XEvent . XELite ;
8+
9+ namespace Dataplat . Dbatools . Commands
10+ {
11+ /// <summary>
12+ /// Implements the <c>Read-XEvent</c> internal command
13+ /// </summary>
14+ [ Cmdlet ( "Read" , "XEvent" , DefaultParameterSetName = "Default" , RemotingCapability = RemotingCapability . PowerShell ) ]
15+ public class ReadXEvent : PSCmdlet
16+ {
17+ #region Parameters
18+ /// <summary>
19+ /// The FileName of the .XEL
20+ /// </summary>
21+ [ Parameter ( ValueFromPipeline = true ) ]
22+ [ Alias ( "FullName" ) ]
23+ public string FileName ;
24+
25+ /// <summary>
26+ /// The ConnectionString to to SQL Instance
27+ /// </summary>
28+ [ Parameter ( ) ]
29+ public string ConnectionString ;
30+
31+ /// <summary>
32+ /// The session name of the XE
33+ /// </summary>
34+ [ Parameter ( ) ]
35+ public string SessionName ;
36+ #endregion Parameters
37+
38+ #region Private Methods
39+
40+ private readonly CancellationTokenSource CancelToken = new CancellationTokenSource ( ) ;
41+
42+ private void ParseFile ( string FileName )
43+ {
44+ new XEFileEventStreamer ( FileName ) . ReadEventStream ( delegate ( )
45+ {
46+ return Task . CompletedTask ;
47+ } ,
48+ delegate ( IXEvent xevent )
49+ {
50+ WriteObject ( xevent ) ;
51+ return Task . CompletedTask ;
52+ } ,
53+ CancelToken . Token
54+ ) . Wait ( ) ;
55+ }
56+
57+ private void ParseStream ( string ConnectionString , string SessionName )
58+ {
59+ XELiveEventStreamer XELiveEventStreamer = new XELiveEventStreamer ( ConnectionString , SessionName ) ;
60+ BlockingCollection < object > queue = new BlockingCollection < object > ( 255 ) ;
61+
62+ Task task = XELiveEventStreamer . ReadEventStream ( delegate ( )
63+ {
64+ queue . Add ( null , CancelToken . Token ) ;
65+ return Task . CompletedTask ;
66+ } ,
67+ delegate ( IXEvent xevent )
68+ {
69+ queue . Add ( xevent , CancelToken . Token ) ;
70+ return Task . CompletedTask ;
71+ } ,
72+ CancelToken . Token
73+ ) ;
74+
75+ task . ContinueWith ( delegate ( Task newtask )
76+ {
77+ queue . CompleteAdding ( ) ;
78+ }
79+ ) ;
80+
81+ while ( true )
82+ {
83+ try
84+ {
85+ object item = queue . Take ( CancelToken . Token ) ;
86+
87+ if ( item != null )
88+ {
89+ WriteObject ( item ) ;
90+ }
91+ }
92+ catch ( OperationCanceledException )
93+ {
94+ break ;
95+ }
96+ catch ( InvalidOperationException ex )
97+ {
98+ if ( ! CancelToken . IsCancellationRequested )
99+ {
100+ ExceptionDispatchInfo . Capture ( ex ) . Throw ( ) ;
101+ }
102+ break ;
103+ }
104+ }
105+ }
106+
107+ #endregion Private Methods
108+
109+ #region Command Implementation
110+ /// <summary>
111+ /// Implements the begin action of the command
112+ /// </summary>
113+ protected override void BeginProcessing ( )
114+ {
115+
116+ }
117+
118+ /// <summary>
119+ /// Implements the process action of the command
120+ /// </summary>
121+ protected override void ProcessRecord ( )
122+ {
123+ if ( FileName != null )
124+ {
125+ ParseFile ( FileName ) ;
126+ }
127+ else
128+ {
129+ // ps checks to ensure both ConnectionString and SessionName exist
130+ ParseStream ( ConnectionString , SessionName ) ;
131+ }
132+ }
133+
134+ /// <summary>
135+ /// Implements the end action of the command
136+ /// </summary>
137+ protected override void EndProcessing ( )
138+ {
139+ }
140+ #endregion Command Implementation
141+ }
142+ }
0 commit comments