diff --git a/TableDependency.SqlClient/Base/Abstracts/ITableDependency.cs b/TableDependency.SqlClient/Base/Abstracts/ITableDependency.cs index 0ea0ee8..cc26060 100644 --- a/TableDependency.SqlClient/Base/Abstracts/ITableDependency.cs +++ b/TableDependency.SqlClient/Base/Abstracts/ITableDependency.cs @@ -1,4 +1,5 @@ #region License + // TableDependency, SqlTableDependency // Copyright (c) 2015-2020 Christian Del Bianco. All rights reserved. // @@ -22,7 +23,8 @@ // WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR // OTHER DEALINGS IN THE SOFTWARE. -#endregion + +#endregion License using System; using System.Diagnostics; @@ -39,16 +41,18 @@ public interface ITableDependency : IDisposable #region Events event ErrorEventHandler OnError; + event StatusEventHandler OnStatusChanged; - #endregion + #endregion Events #region Methods void Start(int timeOut = 120, int watchDogTimeOut = 180); + void Stop(); - #endregion + #endregion Methods #region Properties @@ -61,7 +65,7 @@ public interface ITableDependency : IDisposable string TableName { get; } string SchemaName { get; } - #endregion + #endregion Properties } public interface ITableDependency : ITableDependency where T : class, new() @@ -70,6 +74,15 @@ public interface ITableDependency : IDisposable event ChangedEventHandler OnChanged; - #endregion + #endregion Events + } + + public interface IDynamicTableDependency : ITableDependency + { + #region Events + + event ChangedEventHandler OnChanged; + + #endregion Events } } \ No newline at end of file diff --git a/TableDependency.SqlClient/Base/Delegates/ChangedEventHandler.cs b/TableDependency.SqlClient/Base/Delegates/ChangedEventHandler.cs index 58c8186..efa67e9 100644 --- a/TableDependency.SqlClient/Base/Delegates/ChangedEventHandler.cs +++ b/TableDependency.SqlClient/Base/Delegates/ChangedEventHandler.cs @@ -1,4 +1,5 @@ #region License + // TableDependency, SqlTableDependency // Copyright (c) 2015-2020 Christian Del Bianco. All rights reserved. // @@ -22,11 +23,14 @@ // WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR // OTHER DEALINGS IN THE SOFTWARE. -#endregion + +#endregion License using TableDependency.SqlClient.Base.EventArgs; namespace TableDependency.SqlClient.Base.Delegates { + public delegate void ChangedEventHandler(object sender, DynamicRecordChangedEventArgs e); + public delegate void ChangedEventHandler(object sender, RecordChangedEventArgs e) where T : class, new(); } \ No newline at end of file diff --git a/TableDependency.SqlClient/Base/DynamicTableDependency.cs b/TableDependency.SqlClient/Base/DynamicTableDependency.cs new file mode 100644 index 0000000..aabf56f --- /dev/null +++ b/TableDependency.SqlClient/Base/DynamicTableDependency.cs @@ -0,0 +1,444 @@ +#region License + +// TableDependency, SqlTableDependency +// Copyright (c) 2015-2020 Christian Del Bianco. All rights reserved. +// +// Permission is hereby granted, free of charge, to any person +// obtaining a copy of this software and associated documentation +// files (the "Software"), to deal in the Software without +// restriction, including without limitation the rights to use, +// copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following +// conditions: +// +// The above copyright notice and this permission notice shall be +// included in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +// OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +// NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +// HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +// WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +// OTHER DEALINGS IN THE SOFTWARE. + +#endregion License + +using System; +using System.Collections.Generic; +using System.ComponentModel.DataAnnotations.Schema; +using System.Data; +using System.Diagnostics; +using System.Globalization; +using System.Linq; +using System.Reflection; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +using TableDependency.SqlClient.Base.Abstracts; +using TableDependency.SqlClient.Base.Delegates; +using TableDependency.SqlClient.Base.Enums; +using TableDependency.SqlClient.Base.EventArgs; +using TableDependency.SqlClient.Base.Exceptions; +using TableDependency.SqlClient.Base.Messages; +using TableDependency.SqlClient.Base.Utilities; + +namespace TableDependency.SqlClient.Base +{ + public abstract class DynamicTableDependency : IDynamicTableDependency + { + #region Instance Variables + + /// protected IModelToTableMapper _mapper; + protected CancellationTokenSource _cancellationTokenSource; + + protected string _connectionString; + protected string _tableName; + protected string _schemaName; + protected string _server; + protected string _database; + protected Task _task; + protected IList _processableMessages; + protected TableDependencyStatus _status; + protected DmlTriggerType _dmlTriggerType; + protected ITableDependencyFilter _filter; + protected bool _disposed; + protected string _dataBaseObjectsNamingConvention; + protected bool _databaseObjectsCreated; + + #endregion Instance Variables + + #region Events + + /// + /// Occurs when an error happen during listening for changes on monitored table. + /// + public abstract event ErrorEventHandler OnError; + + /// + /// Occurs when the table content has been changed with an update, insert or delete operation. + /// + public abstract event ChangedEventHandler OnChanged; + + /// + /// Occurs when SqlTableDependency changes. + /// + public abstract event StatusEventHandler OnStatusChanged; + + #endregion Events + + #region Properties + + /// + /// Gets the ModelToTableMapper. + /// + /// public IModelToTableMapper Mapper => _mapper; + + /// + /// Gets or sets the trace switch. + /// + /// + /// The trace switch. + /// + public TraceLevel TraceLevel { get; set; } = TraceLevel.Off; + + /// + /// Gets or Sets the TraceListener. + /// + /// + /// The logger. + /// + public TraceListener TraceListener { get; set; } + + /// + /// Gets or sets the culture info. + /// + /// + /// The culture information five letters iso code. + /// + public CultureInfo CultureInfo { get; set; } = new CultureInfo("en-US"); + + /// + /// Gets or sets the encoding use to convert database strings. + /// + /// + /// The encoding. + /// + public Encoding Encoding { get; set; } + + /// + /// Return the database objects naming convention for created objects used to receive notifications. + /// + /// + /// The data base objects naming. + /// + public string DataBaseObjectsNamingConvention => new string(_dataBaseObjectsNamingConvention.ToCharArray()); + + /// + /// Gets the SqlTableDependency status. + /// + /// + /// The TableDependencyStatus enumeration status. + /// + public TableDependencyStatus Status => _status; + + /// + /// Gets name of the table. + /// + /// + /// The name of the table. + /// + public string TableName => _tableName; + + /// + /// Gets or sets the name of the schema. + /// + /// + /// The name of the schema. + /// + public string SchemaName => _schemaName; + + #endregion Properties + + #region Constructors + + protected DynamicTableDependency( + string connectionString, + string tableName = null, + string schemaName = null, + /// IUpdateOfModel updateOf = null, + ITableDependencyFilter filter = null, + DmlTriggerType dmlTriggerType = DmlTriggerType.All, + bool executeUserPermissionCheck = true) + { + // if (updateOf?.Count() == 0) throw new UpdateOfException("updateOf parameter is empty."); + + _connectionString = connectionString; + this.CheckIfConnectionStringIsValid(); + if (executeUserPermissionCheck) this.CheckIfUserHasPermissions(); + + _tableName = this.GetTableName(tableName); + _schemaName = this.GetSchemaName(schemaName); + _server = this.GetServerName(); + _database = this.GetDataBaseName(); + + this.CheckIfTableExists(); + this.CheckRdbmsDependentImplementation(); + + var tableColumnList = this.GetTableColumnsList(); + if (!tableColumnList.Any()) throw new TableWithNoColumnsException(_tableName); + + this.CheckUpdateOfCongruenceWithTriggerType(dmlTriggerType); + + _dataBaseObjectsNamingConvention = this.GetBaseObjectsNamingConvention(); + _dmlTriggerType = dmlTriggerType; + _filter = filter; + } + + #endregion Constructors + + #region Public methods + + /// + /// Starts monitoring table's content changes. + /// + /// The WAITFOR timeout in seconds. + /// The WATCHDOG timeout in seconds. + public abstract void Start(int timeOut = 120, int watchDogTimeOut = 180); + + /// + /// Stops monitoring table's content changes. + /// + public abstract void Stop(); + + #endregion Public methods + + #region Logging + + protected virtual string FormatTraceMessageHeader() + { + return $"{DateTime.Now:yyyy-MM-dd HH:mm:ss.fff} [Server: {_server} Database: {_database}]"; + } + + protected string DumpException(Exception exception) + { + var sb = new StringBuilder(); + + sb.AppendLine(Environment.NewLine); + sb.AppendLine("EXCEPTION:"); + sb.AppendLine(exception.GetType().Name); + sb.AppendLine(exception.Message); + sb.AppendLine(exception.StackTrace); + + var innerException = exception.InnerException; + if (innerException != null) AddInnerException(sb, innerException); + + return sb.ToString(); + } + + protected static void AddInnerException(StringBuilder sb, Exception exception) + { + while (true) + { + sb.AppendLine(Environment.NewLine); + sb.AppendLine("INNER EXCEPTION:"); + sb.AppendLine(exception.GetType().Name); + sb.AppendLine(exception.Message); + sb.AppendLine(exception.StackTrace); + + var innerException = exception.InnerException; + if (innerException != null) + { + exception = innerException; + continue; + } + + break; + } + } + + protected virtual void WriteTraceMessage(TraceLevel traceLevel, string message, Exception exception = null) + { + try + { + if (this.TraceListener == null) return; + if (this.TraceLevel < TraceLevel.Off || this.TraceLevel > TraceLevel.Verbose) return; + + if (this.TraceLevel >= traceLevel) + { + var messageToWrite = new StringBuilder(message); + if (exception != null) messageToWrite.Append(this.DumpException(exception)); + this.TraceListener.WriteLine($"{this.FormatTraceMessageHeader()}{messageToWrite}"); + this.TraceListener.Flush(); + } + } + catch + { + // Intentionally ignored + } + } + + #endregion Logging + + #region Checks + + protected virtual void CheckUpdateOfCongruenceWithTriggerType(DmlTriggerType dmlTriggerType) + { + if (!dmlTriggerType.HasFlag(DmlTriggerType.Update) && !dmlTriggerType.HasFlag(DmlTriggerType.All)) + { + throw new DmlTriggerTypeException("updateOf parameter can be specified only if DmlTriggerType parameter contains DmlTriggerType.Update too, not for DmlTriggerType.Delete or DmlTriggerType.Insert only."); + } + } + + protected abstract void CheckIfUserInterestedColumnsCanBeManaged(); + + protected virtual void CheckRdbmsDependentImplementation() + { + } + + protected abstract void CheckIfTableExists(); + + protected abstract void CheckIfUserHasPermissions(); + + protected abstract void CheckIfConnectionStringIsValid(); + + #endregion Checks + + #region Get infos + + protected abstract IEnumerable GetTableColumnsList(); + + protected abstract string GetBaseObjectsNamingConvention(); + + protected abstract string GetDataBaseName(); + + protected abstract string GetServerName(); + + protected abstract string GetTableName(string tableName); + + protected virtual string GetTableNameFromDataAnnotation() + { + var attribute = typeof(DataTable).GetTypeInfo().GetCustomAttribute(typeof(TableAttribute)); + return ((TableAttribute)attribute)?.Name; + } + + protected abstract string GetSchemaName(string schemaName); + + protected virtual DynamicRecordChangedEventArgs GetRecordChangedEventArgs(MessagesBag messagesBag) + { + return new DynamicRecordChangedEventArgs( + messagesBag, + _server, + _database, + _dataBaseObjectsNamingConvention, + this.CultureInfo); + } + + #endregion Get infos + + #region Notifications + + protected void NotifyListenersAboutStatus(Delegate[] onStatusChangedSubscribedList, TableDependencyStatus status) + { + _status = status; + + if (onStatusChangedSubscribedList == null) return; + + foreach (var dlg in onStatusChangedSubscribedList.Where(d => d != null)) + { + try + { + dlg.GetMethodInfo().Invoke(dlg.Target, new object[] { this, new StatusChangedEventArgs(status, _server, _database, _dataBaseObjectsNamingConvention) }); + } + catch + { + // Intentionally ignored + } + } + } + + protected void NotifyListenersAboutError(Delegate[] onErrorSubscribedList, Exception exception) + { + if (onErrorSubscribedList == null) return; + + foreach (var dlg in onErrorSubscribedList.Where(d => d != null)) + { + try + { + dlg.GetMethodInfo().Invoke(dlg.Target, new object[] { this, new ErrorEventArgs(exception, _server, _database, _dataBaseObjectsNamingConvention) }); + } + catch + { + // Intentionally ignored + } + } + } + + protected void NotifyListenersAboutChange(Delegate[] changeSubscribedList, MessagesBag messagesBag) + { + if (changeSubscribedList == null) return; + + foreach (var dlg in changeSubscribedList.Where(d => d != null)) + { + try + { + dlg.GetMethodInfo().Invoke(dlg.Target, new object[] { this, this.GetRecordChangedEventArgs(messagesBag) }); + } + catch (NoMatchBetweenModelAndTableColumns) + { + throw; + } + catch (Exception ex) + { + this.WriteTraceMessage(TraceLevel.Error, $"Received message type = {ex.Message}."); + // Intentionally ignored + } + } + } + + #endregion Notifications + + #region Database object generation/disposition + + protected abstract IList CreateDatabaseObjects(int timeOut, int watchDogTimeOut); + + protected abstract void DropDatabaseObjects(); + + #endregion Database object generation/disposition + + #region IDisposable implementation + + public void Dispose() + { + this.Dispose(true); + GC.SuppressFinalize(this); + } + + protected virtual void Dispose(bool disposing) + { + if (_disposed) + { + return; + } + + if (disposing) + { + this.Stop(); + + this.TraceListener?.Dispose(); + } + + _disposed = true; + } + + ~DynamicTableDependency() + { + this.Dispose(false); + } + + #endregion IDisposable implementation + } +} \ No newline at end of file diff --git a/TableDependency.SqlClient/Base/EventArgs/DynamicRecordChangedEventArgs.cs b/TableDependency.SqlClient/Base/EventArgs/DynamicRecordChangedEventArgs.cs new file mode 100644 index 0000000..2246538 --- /dev/null +++ b/TableDependency.SqlClient/Base/EventArgs/DynamicRecordChangedEventArgs.cs @@ -0,0 +1,197 @@ +#region License + +// TableDependency, SqlTableDependency +// Copyright (c) 2015-2020 Christian Del Bianco. All rights reserved. +// +// Permission is hereby granted, free of charge, to any person +// obtaining a copy of this software and associated documentation +// files (the "Software"), to deal in the Software without +// restriction, including without limitation the rights to use, +// copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following +// conditions: +// +// The above copyright notice and this permission notice shall be +// included in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +// OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +// NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +// HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +// WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +// OTHER DEALINGS IN THE SOFTWARE. + +#endregion License + +using System; +using System.Collections.Generic; +using System.Globalization; +using System.Linq; +using System.Reflection; +using TableDependency.SqlClient.Base.Enums; +using TableDependency.SqlClient.Base.Exceptions; +using TableDependency.SqlClient.Base.Messages; +using TableDependency.SqlClient.Base.Utilities; + +namespace TableDependency.SqlClient.Base.EventArgs +{ + public class DynamicRecordChangedEventArgs : BaseEventArgs + { + #region Instance variables + + protected MessagesBag MessagesBag { get; } + + #endregion Instance variables + + #region Properties + + public IDictionary Entity { get; protected set; } + public IDictionary EntityOldValues { get; protected set; } + public ChangeType ChangeType { get; protected set; } + + #endregion Properties + + #region Constructors + + public DynamicRecordChangedEventArgs( + MessagesBag messagesBag, + string server, + string database, + string sender, + CultureInfo cultureInfo, + bool includeOldValues = false) : base(server, database, sender, cultureInfo) + { + this.MessagesBag = messagesBag; + + this.ChangeType = messagesBag.MessageType; + this.Entity = this.MaterializeEntity(messagesBag.Messages.Where(m => !m.IsOldValue).ToList()); + + if (includeOldValues && this.ChangeType == ChangeType.Update) + { + this.EntityOldValues = this.MaterializeEntity(messagesBag.Messages.Where(m => m.IsOldValue).ToList()); + } + else + { + this.EntityOldValues = new Dictionary(); + } + } + + #endregion Constructors + + #region public methods + + public virtual object GetValue(PropertyInfo propertyInfo, TableColumnInfo columnInfo, byte[] message) + { + var stringValue = Convert.ToString(this.MessagesBag.Encoding.GetString(message), base.CultureInfo); + return this.GetValueObject(propertyInfo, stringValue); + } + + #endregion public methods + + #region Protected Methods + + protected virtual object GetValueObject(PropertyInfo propertyInfo, string value) + { + var propertyType = Nullable.GetUnderlyingType(propertyInfo.PropertyType) ?? propertyInfo.PropertyType; + var typeCode = Type.GetTypeCode(propertyType); + + try + { + switch (typeCode) + { + case TypeCode.Boolean: + return bool.Parse(value); + + case TypeCode.Char: + return char.Parse(value); + + case TypeCode.SByte: + return sbyte.Parse(value, base.CultureInfo); + + case TypeCode.Byte: + return byte.Parse(value, base.CultureInfo); + + case TypeCode.Int16: + return short.Parse(value, base.CultureInfo); + + case TypeCode.UInt16: + return ushort.Parse(value, base.CultureInfo); + + case TypeCode.Int32: + return int.Parse(value, base.CultureInfo); + + case TypeCode.UInt32: + return uint.Parse(value, base.CultureInfo); + + case TypeCode.Int64: + return long.Parse(value, base.CultureInfo); + + case TypeCode.UInt64: + return ulong.Parse(value, base.CultureInfo); + + case TypeCode.Single: + return float.Parse(value, base.CultureInfo); + + case TypeCode.Double: + return double.Parse(value, base.CultureInfo); + + case TypeCode.Decimal: + return decimal.Parse(value, base.CultureInfo); + + case TypeCode.DateTime: + return DateTime.Parse(value, base.CultureInfo); + + case TypeCode.String: + return value; + + case TypeCode.Object: + Guid guid; + if (Guid.TryParse(value, out guid)) return guid; + + TimeSpan timeSpan; + if (TimeSpan.TryParse(value, out timeSpan)) return timeSpan; + + DateTimeOffset dateTimeOffset; + if (DateTimeOffset.TryParse(value, out dateTimeOffset)) return dateTimeOffset; + + break; + } + } + catch + { + var errorMessage = $"Propery {propertyInfo.Name} cannot be set with db value {value}"; + throw new NoMatchBetweenModelAndTableColumns(errorMessage); + } + + return null; + } + + protected virtual TableColumnInfo GetColumnInfo(string columnName) + { + return null; + // return this.UserInterestedColumns.First(uic => string.Equals(uic.Name, columnName, StringComparison.CurrentCultureIgnoreCase)); + } + + protected virtual IDictionary MaterializeEntity(List messages) + { + var row = new Dictionary(); + foreach (var message in messages) + { + var stringValue = Convert.ToString(this.MessagesBag.Encoding.GetString(message.Body), base.CultureInfo); + row.Add(message.Recipient, stringValue); + } + + return row; + } + + protected virtual bool IsNullableType(Type type) + { + return type.IsGenericType && type.GetGenericTypeDefinition() == typeof(Nullable<>); + } + + #endregion Protected Methods + } +} \ No newline at end of file diff --git a/TableDependency.SqlClient/DynamicSqlTableDependency.cs b/TableDependency.SqlClient/DynamicSqlTableDependency.cs new file mode 100644 index 0000000..8f3e761 --- /dev/null +++ b/TableDependency.SqlClient/DynamicSqlTableDependency.cs @@ -0,0 +1,1072 @@ +#region License + +// TableDependency, SqlTableDependency +// Copyright (c) 2015-2020 Christian Del Bianco. All rights reserved. +// +// Permission is hereby granted, free of charge, to any person +// obtaining a copy of this software and associated documentation +// files (the "Software"), to deal in the Software without +// restriction, including without limitation the rights to use, +// copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following +// conditions: +// +// The above copyright notice and this permission notice shall be +// included in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +// OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +// NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +// HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +// WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +// OTHER DEALINGS IN THE SOFTWARE. + +#endregion License + +using System; +using System.Collections.Generic; +using System.Data; +using System.Data.SqlClient; +using System.Diagnostics; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +using TableDependency.SqlClient.Base; +using TableDependency.SqlClient.Base.Abstracts; +using TableDependency.SqlClient.Base.Delegates; +using TableDependency.SqlClient.Base.Enums; +using TableDependency.SqlClient.Base.EventArgs; +using TableDependency.SqlClient.Base.Exceptions; +using TableDependency.SqlClient.Base.Messages; +using TableDependency.SqlClient.Base.Utilities; +using TableDependency.SqlClient.Enumerations; +using TableDependency.SqlClient.Exceptions; +using TableDependency.SqlClient.Extensions; +using TableDependency.SqlClient.Messages; +using TableDependency.SqlClient.Resources; +using TableDependency.SqlClient.Utilities; + +namespace TableDependency.SqlClient +{ + /// + /// SqlTableDependency class: monitor SQL Server table record changes and notify it. + /// + public class DynamicSqlTableDependency : DynamicTableDependency + { + #region Private variables + + protected readonly bool IncludeOldValues; + protected Guid ConversationHandle; + protected const string StartMessageTemplate = "{0}/StartMessage/{1}"; + protected const string EndMessageTemplate = "{0}/EndMessage"; + + #endregion Private variables + + #region Properties + + /// + /// Gets or sets a value indicating whether activate database logging and event viewer logging. + /// + /// + /// Only a member of the sysadmin fixed server role or a user with ALTER TRACE permissions can use it. + /// + /// + /// true if [activate database logging]; otherwise, false. + /// + public bool ActivateDatabaseLogging { get; set; } + + /// + /// Specifies the owner of the service to the specified database user. + /// When a new service is created it is owned by the principal specified in the AUTHORIZATION clause. Server, database, and schema names cannot be specified. The service_name must be a valid sysname. + /// When the current user is dbo or sa, owner_name may be the name of any valid user or role. + /// Otherwise, owner_name must be the name of the current user, the name of a user that the current user has IMPERSONATE permission for, or the name of a role to which the current user belongs. + /// + public string ServiceAuthorization { get; set; } + + /// + /// Specifies the SQL Server database user account under which the activation stored procedure runs. + /// SQL Server must be able to check the permissions for this user at the time that the queue activates the stored procedure. For aWindows domain user, the server must be connected to the domain + /// when the procedure is activated or when activation fails.For a SQL Server user, Service Broker always checks the permissions.EXECUTE AS SELF means that the stored procedure executes as the current user. + /// + public string QueueExecuteAs { get; set; } = "SELF"; + + #endregion Properties + + #region Events + + /// + /// Occurs when an error happen during listening for changes on monitored table. + /// + public override event ErrorEventHandler OnError; + + /// + /// Occurs when the table content has been changed with an update, insert or delete operation. + /// + public override event ChangedEventHandler OnChanged; + + /// + /// Occurs when an status changes happen. + /// + public override event StatusEventHandler OnStatusChanged; + + #endregion Events + + #region Constructors + + /// + /// Initializes a new instance of the class. + /// + /// The connection string. + /// Name of the table. + /// Name of the schema. + /// The model to database table column mapper. + /// List of columns that need to monitor for changing on order to receive notifications. + /// The filter condition translated in WHERE. + /// The notify on Insert, Delete, Update operation. + /// if set to true [skip user permission check]. + /// if set to true [include old values]. + public DynamicSqlTableDependency( + string connectionString, + string tableName = null, + string schemaName = null, + ITableDependencyFilter filter = null, + DmlTriggerType notifyOn = DmlTriggerType.All, + bool executeUserPermissionCheck = true, + bool includeOldValues = false) : base(connectionString, tableName, schemaName, filter, notifyOn, executeUserPermissionCheck) + { + this.IncludeOldValues = includeOldValues; + } + + #endregion Constructors + + #region Public methods + + /// + /// Starts monitoring table's content changes. + /// + /// The WAITFOR timeout in seconds. + /// The WATCHDOG timeout in seconds. + /// + public override void Start(int timeOut = 120, int watchDogTimeOut = 180) + { + if (timeOut < 60) throw new ArgumentException("timeOut must be greater or equal to 60 seconds"); + if (watchDogTimeOut < 60 || watchDogTimeOut < (timeOut + 60)) throw new WatchDogTimeOutException("watchDogTimeOut must be at least 60 seconds bigger then timeOut"); + if (_task != null) return; + + if (this.OnChanged == null) throw new NoSubscriberException(); + + var onChangedSubscribedList = this.OnChanged?.GetInvocationList(); + var onErrorSubscribedList = this.OnError?.GetInvocationList(); + var onStatusChangedSubscribedList = this.OnStatusChanged?.GetInvocationList(); + + this.NotifyListenersAboutStatus(onStatusChangedSubscribedList, TableDependencyStatus.Starting); + + _disposed = false; + _processableMessages = this.CreateDatabaseObjects(timeOut, watchDogTimeOut); + _cancellationTokenSource = new CancellationTokenSource(); + + _task = Task.Factory.StartNew(() => + WaitForNotifications( + _cancellationTokenSource.Token, + onChangedSubscribedList, + onErrorSubscribedList, + onStatusChangedSubscribedList, + timeOut, + watchDogTimeOut), + _cancellationTokenSource.Token); + + this.WriteTraceMessage(TraceLevel.Info, $"Waiting for receiving {_tableName}'s records change notifications."); + } + + /// + /// Stops monitoring table's content changes. + /// + public override void Stop() + { + if (_task != null) + { + _cancellationTokenSource.Cancel(true); + _task?.Wait(); + } + + _task = null; + _disposed = true; + + this.WriteTraceMessage(TraceLevel.Info, "Stopped waiting for notification."); + } + + #endregion Public methods + + #region Protected virtual methods + + protected virtual string Spacer(int numberOrSpaces) + { + var stringBuilder = new StringBuilder(); + for (var i = 1; i <= numberOrSpaces; i++) stringBuilder.Append(' '); + return stringBuilder.ToString(); + } + + protected override DynamicRecordChangedEventArgs GetRecordChangedEventArgs(MessagesBag messagesBag) + { + return new DynamicRecordChangedEventArgs( + messagesBag, + _server, + _database, + _dataBaseObjectsNamingConvention, + base.CultureInfo, + this.IncludeOldValues); + } + + protected override string GetDataBaseName() + { + var sqlConnectionStringBuilder = new SqlConnectionStringBuilder(_connectionString); + return sqlConnectionStringBuilder.InitialCatalog; + } + + protected override string GetServerName() + { + var sqlConnectionStringBuilder = new SqlConnectionStringBuilder(_connectionString); + return sqlConnectionStringBuilder.DataSource; + } + + protected override string GetTableName(string tableName) + { + if (string.IsNullOrWhiteSpace(tableName)) + { + throw new Exception("Invalid table name"); + } + + return tableName; + } + + protected override string GetSchemaName(string schemaName) + { + return !string.IsNullOrWhiteSpace(schemaName) ? schemaName : "dbo"; + } + + protected virtual SqlServerVersion GetSqlServerVersion() + { + var sqlConnection = new SqlConnection(_connectionString); + + try + { + sqlConnection.Open(); + + var serverVersion = sqlConnection.ServerVersion; + if (string.IsNullOrWhiteSpace(serverVersion)) return SqlServerVersion.Unknown; + + var serverVersionDetails = serverVersion.Split(new[] { "." }, StringSplitOptions.None); + var versionNumber = int.Parse(serverVersionDetails[0]); + + if (versionNumber < 8) return SqlServerVersion.Unknown; + if (versionNumber == 8) return SqlServerVersion.SqlServer2000; + if (versionNumber == 9) return SqlServerVersion.SqlServer2005; + if (versionNumber == 10) return SqlServerVersion.SqlServer2008; + if (versionNumber == 11) return SqlServerVersion.SqlServer2012; + } + catch + { + throw new SqlServerVersionNotSupportedException(); + } + finally + { + sqlConnection.Close(); + } + + return SqlServerVersion.SqlServerLatest; + } + + protected override IEnumerable GetTableColumnsList() + { + var columnsList = new List(); + + using (var sqlConnection = new SqlConnection(_connectionString)) + { + sqlConnection.Open(); + using (var sqlCommand = sqlConnection.CreateCommand()) + { + sqlCommand.CommandText = string.Format(SqlScripts.InformationSchemaColumns, _schemaName, _tableName); + var reader = sqlCommand.ExecuteReader(); + while (reader.Read()) + { + var name = reader["COLUMN_NAME"].ToString(); + var type = reader["DATA_TYPE"].ToString().ConvertNumericType(); + var size = this.ComputeSize( + type, + reader.GetSafeString(reader.GetOrdinal("CHARACTER_MAXIMUM_LENGTH")), + reader.GetSafeString(reader.GetOrdinal("NUMERIC_PRECISION")), + reader.GetSafeString(reader.GetOrdinal("NUMERIC_SCALE")), + reader.GetSafeString(reader.GetOrdinal("DATETIME_PRECISION"))); + + columnsList.Add(new TableColumnInfo(name, type, size)); + } + } + } + + return columnsList; + } + + protected virtual bool CheckIfDatabaseObjectExists() + { + bool result; + + using (var sqlConnection = new SqlConnection(_connectionString)) + { + sqlConnection.Open(); + var sqlCommand = new SqlCommand($"SELECT COUNT(*) FROM sys.service_queues WITH (NOLOCK) WHERE name = N'{_dataBaseObjectsNamingConvention}';", sqlConnection); + result = (int)sqlCommand.ExecuteScalar() > 0; + sqlConnection.Close(); + } + + return result; + } + + protected override IList CreateDatabaseObjects(int timeOut, int watchDogTimeOut) + { + IList processableMessages; + + if (this.CheckIfDatabaseObjectExists() == false) + { + processableMessages = this.CreateSqlServerDatabaseObjects(watchDogTimeOut); + } + else + { + throw new DbObjectsWithSameNameException(_dataBaseObjectsNamingConvention); + } + + return processableMessages; + } + + protected override string GetBaseObjectsNamingConvention() + { + var name = $"{_schemaName}_{_tableName}"; + return $"{name}_{Guid.NewGuid()}"; + } + + protected override void DropDatabaseObjects() + { + if (!_databaseObjectsCreated) return; + + using (var sqlConnection = new SqlConnection(_connectionString)) + { + sqlConnection.Open(); + using (var sqlTransaction = sqlConnection.BeginTransaction(IsolationLevel.Serializable)) + { + using (var sqlCommand = sqlConnection.CreateCommand()) + { + var dropMessages = string.Join(Environment.NewLine, _processableMessages.Select((pm, index) => + { + if (index > 0) + { + return this.Spacer(8) + string.Format("IF EXISTS (SELECT * FROM sys.service_message_types WITH (NOLOCK) WHERE name = N'{0}') DROP MESSAGE TYPE [{0}];", pm); + } + + return string.Format("IF EXISTS (SELECT * FROM sys.service_message_types WITH (NOLOCK) WHERE name = N'{0}') DROP MESSAGE TYPE [{0}];", pm); + })); + + var dropAllScript = this.PrepareScriptDropAll(dropMessages); + + sqlCommand.Transaction = sqlTransaction; + sqlCommand.CommandType = CommandType.Text; + sqlCommand.CommandText = dropAllScript; + sqlCommand.ExecuteNonQuery(); + + sqlTransaction.Commit(); + } + } + } + + this.WriteTraceMessage(TraceLevel.Info, "DropDatabaseObjects method executed."); + } + + protected override void CheckRdbmsDependentImplementation() + { + this.CheckIfServiceBrokerIsEnabled(); + + var sqlVersion = this.GetSqlServerVersion(); + if (sqlVersion < SqlServerVersion.SqlServer2008) throw new SqlServerVersionNotSupportedException(sqlVersion); + } + + protected virtual string CreateWhereCondition(bool prependSpace = false) + { + var where = string.Empty; + + var filter = _filter?.Translate(); + if (!string.IsNullOrWhiteSpace(filter)) + { + where = (prependSpace ? " " : string.Empty) + "WHERE " + filter; + } + + return where.Trim(); + } + + protected virtual string PrepareInsertIntoTableVariableForUpdateChange(TableColumnInfo[] userInterestedColumns, string columnsForUpdateOf) + { + var insertIntoExceptTableStatement = this.PrepareInsertIntoModifiedRecordsTableStatement(userInterestedColumns); + + var scriptForInsertInTableVariable = !string.IsNullOrEmpty(columnsForUpdateOf) + ? string.Format(SqlScripts.InsertInTableVariableConsideringUpdateOf, columnsForUpdateOf, ChangeType.Update, insertIntoExceptTableStatement) + : string.Format(SqlScripts.InsertInTableVariable, ChangeType.Update, insertIntoExceptTableStatement); + + return scriptForInsertInTableVariable; + } + + protected virtual IList CreateSqlServerDatabaseObjects(int watchDogTimeOut) + { + var processableMessages = new List(); + var tableColumns = this.GetTableColumnsList(); + + var columnsForModifiedRecordsTable = this.PrepareColumnListForTableVariable(tableColumns, this.IncludeOldValues); + var columnsForExceptTable = this.PrepareColumnListForTableVariable(tableColumns, false); + var columnsForDeletedTable = this.PrepareColumnListForTableVariable(tableColumns, false); + + using (var sqlConnection = new SqlConnection(_connectionString)) + { + sqlConnection.Open(); + + using (var transaction = sqlConnection.BeginTransaction()) + { + var sqlCommand = new SqlCommand { Connection = sqlConnection, Transaction = transaction }; + + // Messages + var startMessageInsert = string.Format(StartMessageTemplate, _dataBaseObjectsNamingConvention, ChangeType.Insert); + sqlCommand.CommandText = $"CREATE MESSAGE TYPE [{startMessageInsert}] VALIDATION = NONE;"; + sqlCommand.ExecuteNonQuery(); + this.WriteTraceMessage(TraceLevel.Verbose, $"Message {startMessageInsert} created."); + processableMessages.Add(startMessageInsert); + + var startMessageUpdate = string.Format(StartMessageTemplate, _dataBaseObjectsNamingConvention, ChangeType.Update); + sqlCommand.CommandText = $"CREATE MESSAGE TYPE [{startMessageUpdate}] VALIDATION = NONE;"; + sqlCommand.ExecuteNonQuery(); + this.WriteTraceMessage(TraceLevel.Verbose, $"Message {startMessageUpdate} created."); + processableMessages.Add(startMessageUpdate); + + var startMessageDelete = string.Format(StartMessageTemplate, _dataBaseObjectsNamingConvention, ChangeType.Delete); + sqlCommand.CommandText = $"CREATE MESSAGE TYPE [{startMessageDelete}] VALIDATION = NONE;"; + sqlCommand.ExecuteNonQuery(); + this.WriteTraceMessage(TraceLevel.Verbose, $"Message {startMessageDelete} created."); + processableMessages.Add(startMessageDelete); + + var interestedColumns = tableColumns.ToArray(); + foreach (var userInterestedColumn in interestedColumns) + { + var message = $"{_dataBaseObjectsNamingConvention}/{userInterestedColumn.Name}"; + sqlCommand.CommandText = $"CREATE MESSAGE TYPE [{message}] VALIDATION = NONE;"; + sqlCommand.ExecuteNonQuery(); + this.WriteTraceMessage(TraceLevel.Verbose, $"Message {message} created."); + processableMessages.Add(message); + + if (this.IncludeOldValues) + { + message = $"{_dataBaseObjectsNamingConvention}/{userInterestedColumn.Name}/old"; + sqlCommand.CommandText = $"CREATE MESSAGE TYPE [{message}] VALIDATION = NONE;"; + sqlCommand.ExecuteNonQuery(); + this.WriteTraceMessage(TraceLevel.Verbose, $"Message {message} created."); + processableMessages.Add(message); + } + } + + var endMessage = string.Format(EndMessageTemplate, _dataBaseObjectsNamingConvention); + sqlCommand.CommandText = $"CREATE MESSAGE TYPE [{endMessage}] VALIDATION = NONE;"; + sqlCommand.ExecuteNonQuery(); + this.WriteTraceMessage(TraceLevel.Verbose, $"Message {endMessage} created."); + processableMessages.Add(endMessage); + + // Contract + var contractBody = string.Join("," + Environment.NewLine, processableMessages.Select(message => $"[{message}] SENT BY INITIATOR")); + sqlCommand.CommandText = $"CREATE CONTRACT [{_dataBaseObjectsNamingConvention}] ({contractBody})"; + sqlCommand.ExecuteNonQuery(); + this.WriteTraceMessage(TraceLevel.Verbose, $"Contract {_dataBaseObjectsNamingConvention} created."); + + // Queues + sqlCommand.CommandText = $"CREATE QUEUE [{_schemaName}].[{_dataBaseObjectsNamingConvention}_Receiver] WITH STATUS = ON, RETENTION = OFF, POISON_MESSAGE_HANDLING (STATUS = OFF);"; + sqlCommand.ExecuteNonQuery(); + this.WriteTraceMessage(TraceLevel.Verbose, $"Queue {_dataBaseObjectsNamingConvention}_Receiver created."); + + sqlCommand.CommandText = $"CREATE QUEUE [{_schemaName}].[{_dataBaseObjectsNamingConvention}_Sender] WITH STATUS = ON, RETENTION = OFF, POISON_MESSAGE_HANDLING (STATUS = OFF);"; + sqlCommand.ExecuteNonQuery(); + this.WriteTraceMessage(TraceLevel.Verbose, $"Queue {_dataBaseObjectsNamingConvention}_Sender created."); + + // Services + sqlCommand.CommandText = string.IsNullOrWhiteSpace(this.ServiceAuthorization) + ? $"CREATE SERVICE [{_dataBaseObjectsNamingConvention}_Sender] ON QUEUE [{_schemaName}].[{_dataBaseObjectsNamingConvention}_Sender];" + : $"CREATE SERVICE [{_dataBaseObjectsNamingConvention}_Sender] AUTHORIZATION [{this.ServiceAuthorization}] ON QUEUE [{_schemaName}].[{_dataBaseObjectsNamingConvention}_Sender];"; + sqlCommand.ExecuteNonQuery(); + this.WriteTraceMessage(TraceLevel.Verbose, $"Service broker {_dataBaseObjectsNamingConvention}_Sender created."); + + sqlCommand.CommandText = string.IsNullOrWhiteSpace(this.ServiceAuthorization) + ? $"CREATE SERVICE [{_dataBaseObjectsNamingConvention}_Receiver] ON QUEUE [{_schemaName}].[{_dataBaseObjectsNamingConvention}_Receiver] ([{_dataBaseObjectsNamingConvention}]);" + : $"CREATE SERVICE [{_dataBaseObjectsNamingConvention}_Receiver] AUTHORIZATION [{this.ServiceAuthorization}] ON QUEUE [{_schemaName}].[{_dataBaseObjectsNamingConvention}_Receiver] ([{_dataBaseObjectsNamingConvention}]);"; + sqlCommand.ExecuteNonQuery(); + this.WriteTraceMessage(TraceLevel.Verbose, $"Service broker {_dataBaseObjectsNamingConvention}_Receiver created."); + + // Activation Store Procedure + var dropMessages = string.Join(Environment.NewLine, processableMessages.Select((pm, index) => + { + if (index > 0) return this.Spacer(8) + string.Format("IF EXISTS (SELECT * FROM sys.service_message_types WITH (NOLOCK) WHERE name = N'{0}') DROP MESSAGE TYPE [{0}];", pm); + return string.Format("IF EXISTS (SELECT * FROM sys.service_message_types WITH (NOLOCK) WHERE name = N'{0}') DROP MESSAGE TYPE [{0}];", pm); + })); + + var dropAllScript = this.PrepareScriptDropAll(dropMessages); + sqlCommand.CommandText = this.PrepareScriptProcedureQueueActivation(dropAllScript); + sqlCommand.ExecuteNonQuery(); + this.WriteTraceMessage(TraceLevel.Verbose, $"Procedure {_dataBaseObjectsNamingConvention} created."); + + // Begin conversation + this.ConversationHandle = this.BeginConversation(sqlCommand); + this.WriteTraceMessage(TraceLevel.Verbose, $"Conversation with handler {this.ConversationHandle} started."); + + // Trigger + var declareVariableStatement = this.PrepareDeclareVariableStatement(interestedColumns); + var selectForSetVariablesStatement = this.PrepareSelectForSetVariables(interestedColumns); + var sendInsertConversationStatements = this.PrepareSendConversation(ChangeType.Insert, interestedColumns); + var sendUpdatedConversationStatements = this.PrepareSendConversation(ChangeType.Update, interestedColumns); + var sendDeletedConversationStatements = this.PrepareSendConversation(ChangeType.Delete, interestedColumns); + + sqlCommand.CommandText = string.Format( + SqlScripts.CreateTrigger, + _dataBaseObjectsNamingConvention, + $"[{_schemaName}].[{_tableName}]", + columnsForModifiedRecordsTable, + this.PrepareColumnListForSelectFromTableVariable(tableColumns), + this.PrepareInsertIntoTableVariableForUpdateChange(interestedColumns, null), + declareVariableStatement, + selectForSetVariablesStatement, + sendInsertConversationStatements, + sendUpdatedConversationStatements, + sendDeletedConversationStatements, + ChangeType.Insert, + ChangeType.Update, + ChangeType.Delete, + string.Join(", ", this.GetDmlTriggerType(_dmlTriggerType)), + this.CreateWhereCondition(), + this.PrepareTriggerLogScript(), + this.ActivateDatabaseLogging ? " WITH LOG" : string.Empty, + columnsForExceptTable, + columnsForDeletedTable, + this.ConversationHandle, + dropAllScript); + + sqlCommand.ExecuteNonQuery(); + this.WriteTraceMessage(TraceLevel.Verbose, $"Trigger {_dataBaseObjectsNamingConvention} created."); + + // Associate Activation Store Procedure to sender queue + sqlCommand.CommandText = $"ALTER QUEUE [{_schemaName}].[{_dataBaseObjectsNamingConvention}_Sender] WITH ACTIVATION (PROCEDURE_NAME = [{_schemaName}].[{_dataBaseObjectsNamingConvention}_QueueActivationSender], MAX_QUEUE_READERS = 1, EXECUTE AS {this.QueueExecuteAs.ToUpper()}, STATUS = ON);"; + sqlCommand.ExecuteNonQuery(); + + // Run the watch-dog + sqlCommand.CommandText = $"BEGIN CONVERSATION TIMER ('{this.ConversationHandle.ToString().ToUpper()}') TIMEOUT = " + watchDogTimeOut + ";"; + sqlCommand.ExecuteNonQuery(); + this.WriteTraceMessage(TraceLevel.Verbose, "Watch dog started."); + + // Persist all objects + transaction.Commit(); + } + + _databaseObjectsCreated = true; + + this.WriteTraceMessage(TraceLevel.Info, $"All OK! Database objects created with naming {_dataBaseObjectsNamingConvention}."); + } + + return processableMessages; + } + + protected virtual Guid BeginConversation(SqlCommand sqlCommand) + { + sqlCommand.CommandText = $"DECLARE @h AS UNIQUEIDENTIFIER; BEGIN DIALOG CONVERSATION @h FROM SERVICE [{_dataBaseObjectsNamingConvention}_Sender] TO SERVICE '{_dataBaseObjectsNamingConvention}_Receiver' ON CONTRACT [{_dataBaseObjectsNamingConvention}] WITH ENCRYPTION = OFF; SELECT @h;"; + var conversationHandler = (Guid)sqlCommand.ExecuteScalar(); + if (conversationHandler == Guid.Empty) throw new ServiceBrokerConversationHandlerInvalidException(); + + return conversationHandler; + } + + protected virtual string PrepareTriggerLogScript() + { + if (this.ActivateDatabaseLogging == false) return string.Empty; + + return + Environment.NewLine + Environment.NewLine + "DECLARE @LogMessage VARCHAR(255);" + Environment.NewLine + + $"SET @LogMessage = 'SqlTableDependency: Message for ' + @dmlType + ' operation added in Queue [{_dataBaseObjectsNamingConvention}].'" + Environment.NewLine + + "RAISERROR(@LogMessage, 10, 1) WITH LOG;"; + } + + protected virtual string PrepareScriptProcedureQueueActivation(string dropAllScript) + { + var script = string.Format(SqlScripts.CreateProcedureQueueActivation, _dataBaseObjectsNamingConvention, dropAllScript, _schemaName); + return this.ActivateDatabaseLogging ? script : this.RemoveLogOperations(script); + } + + protected virtual string PrepareScriptDropAll(string dropMessages) + { + var script = string.Format(SqlScripts.ScriptDropAll, _dataBaseObjectsNamingConvention, dropMessages, _schemaName); + return this.ActivateDatabaseLogging ? script : this.RemoveLogOperations(script); + } + + protected virtual string RemoveLogOperations(string source) + { + while (true) + { + var startPos = source.IndexOf("PRINT N'SqlTableDependency:", StringComparison.InvariantCultureIgnoreCase); + if (startPos < 1) break; + + var endPos = source.IndexOf(".';", startPos, StringComparison.InvariantCultureIgnoreCase); + if (endPos < 1) break; + + source = source.Substring(0, startPos) + source.Substring(endPos + ".';".Length); + } + + return source; + } + + protected virtual string PrepareInsertIntoModifiedRecordsTableStatement(IReadOnlyCollection interestedColumns) + { + string insertIntoExceptStatement; + + var whereCondition = this.CreateWhereCondition(); + + var comma = new Separator(2, ","); + var sBuilderColumns = new StringBuilder(); + foreach (var column in interestedColumns) sBuilderColumns.Append($"{comma.GetSeparator()}[{column.Name}]"); + + var insertedAndDeletedTableVariable = + $"INSERT INTO @deletedTable SELECT {sBuilderColumns} FROM DELETED" + Environment.NewLine + + this.Spacer(12) + + $"INSERT INTO @insertedTable SELECT {sBuilderColumns} FROM INSERTED" + Environment.NewLine; + + if (interestedColumns.Any(tableColumn => string.Equals(tableColumn.Type.ToLowerInvariant(), "timestamp", StringComparison.OrdinalIgnoreCase) || string.Equals(tableColumn.Type.ToLowerInvariant(), "rowversion", StringComparison.OrdinalIgnoreCase))) + { + insertIntoExceptStatement = + insertedAndDeletedTableVariable + + this.Spacer(12) + + $"INSERT INTO @exceptTable SELECT [RowNumber],{sBuilderColumns} FROM @insertedTable"; + } + else + { + insertIntoExceptStatement = + insertedAndDeletedTableVariable + + this.Spacer(12) + + $"INSERT INTO @exceptTable SELECT [RowNumber],{sBuilderColumns} FROM @insertedTable EXCEPT SELECT [RowNumber],{sBuilderColumns} FROM @deletedTable"; + } + + if (this.IncludeOldValues) + { + comma = new Separator(2, ","); + sBuilderColumns = new StringBuilder(); + foreach (var column in interestedColumns) + { + sBuilderColumns.Append($"{comma.GetSeparator()}[{column.Name}]"); + sBuilderColumns.Append($"{comma.GetSeparator()}(SELECT d.[{column.Name}] FROM @deletedTable d WHERE d.[RowNumber] = e.[RowNumber])"); + } + } + + var insertIntoModifiedRecordsTable = + insertIntoExceptStatement + Environment.NewLine + Environment.NewLine + + this.Spacer(12) + + $"INSERT INTO @modifiedRecordsTable SELECT {sBuilderColumns} FROM @exceptTable e {whereCondition}"; + + return insertIntoModifiedRecordsTable; + } + + protected virtual IEnumerable GetDmlTriggerType(DmlTriggerType dmlTriggerType) + { + var afters = new List(); + if (dmlTriggerType.HasFlag(DmlTriggerType.All)) + { + afters.Add(DmlTriggerType.Insert.ToString().ToLowerInvariant()); + afters.Add(DmlTriggerType.Update.ToString().ToLowerInvariant()); + afters.Add(DmlTriggerType.Delete.ToString().ToLowerInvariant()); + } + else + { + if (dmlTriggerType.HasFlag(DmlTriggerType.Insert)) afters.Add(DmlTriggerType.Insert.ToString().ToLowerInvariant()); + if (dmlTriggerType.HasFlag(DmlTriggerType.Delete)) afters.Add(DmlTriggerType.Delete.ToString().ToLowerInvariant()); + if (dmlTriggerType.HasFlag(DmlTriggerType.Update)) afters.Add(DmlTriggerType.Update.ToString().ToLowerInvariant()); + } + + return afters; + } + + protected virtual MessagesBag CreateMessagesBag(Encoding encoding, ICollection processableMessages) + { + return new MessagesBag( + encoding ?? Encoding.Unicode, + new List { string.Format(StartMessageTemplate, _dataBaseObjectsNamingConvention, ChangeType.Insert), string.Format(StartMessageTemplate, _dataBaseObjectsNamingConvention, ChangeType.Update), string.Format(StartMessageTemplate, _dataBaseObjectsNamingConvention, ChangeType.Delete) }, + string.Format(EndMessageTemplate, _dataBaseObjectsNamingConvention), + processableMessages); + } + + protected virtual string PrepareColumnListForSelectFromTableVariable(IEnumerable tableColumns) + { + var columns = tableColumns.Select(c => + { + var column = $"[{c.Name}]"; + + if (this.IncludeOldValues) + { + column += ", NULL"; + } + + return column; + }); + + return string.Join(", ", columns.ToList()); + } + + protected virtual string PrepareColumnListForTableVariable(IEnumerable tableColumns, bool includeOldValues) + { + var columns = tableColumns.Select(tableColumn => + { + if (string.Equals(tableColumn.Type.ToLowerInvariant(), "timestamp", StringComparison.OrdinalIgnoreCase)) + { + var columnBinary = $"[{tableColumn.Name}] BINARY(8)"; + if (includeOldValues) columnBinary += $", [{tableColumn.Name}_old] BINARY(8)"; + return columnBinary; + } + + if (string.Equals(tableColumn.Type.ToLowerInvariant(), "rowversion", StringComparison.OrdinalIgnoreCase)) + { + var columnVarbinary = $"[{tableColumn.Name}] VARBINARY(8)"; + if (includeOldValues) columnVarbinary += $", [{ tableColumn.Name}_old] VARBINARY(8)"; + return columnVarbinary; + } + + if (!string.IsNullOrWhiteSpace(tableColumn.Size)) + { + var columnWithSize = $"[{tableColumn.Name}] {tableColumn.Type}({tableColumn.Size})"; + if (includeOldValues) columnWithSize += $", [{tableColumn.Name}_old] {tableColumn.Type}({tableColumn.Size})"; + return columnWithSize; + } + + var column = $"[{tableColumn.Name}] {tableColumn.Type}"; + if (includeOldValues) column += $", [{tableColumn.Name}_old] {tableColumn.Type}"; + return column; + }); + + return string.Join(", ", columns.ToList()); + } + + protected virtual string ComputeSize(string dataType, string characterMaximumLength, string numericPrecision, string numericScale, string dateTimePrecisione) + { + if (string.Equals(dataType.ToUpperInvariant(), "BINARY", StringComparison.OrdinalIgnoreCase) || + string.Equals(dataType.ToUpperInvariant(), "VARBINARY", StringComparison.OrdinalIgnoreCase) || + string.Equals(dataType.ToUpperInvariant(), "CHAR", StringComparison.OrdinalIgnoreCase) || + string.Equals(dataType.ToUpperInvariant(), "NCHAR", StringComparison.OrdinalIgnoreCase) || + string.Equals(dataType.ToUpperInvariant(), "VARCHAR", StringComparison.OrdinalIgnoreCase) || + string.Equals(dataType.ToUpperInvariant(), "NVARCHAR", StringComparison.OrdinalIgnoreCase)) + { + return characterMaximumLength == "-1" ? "MAX" : characterMaximumLength; + } + + if (string.Equals(dataType.ToUpperInvariant(), "DECIMAL", StringComparison.OrdinalIgnoreCase)) + { + return $"{numericPrecision},{numericScale}"; + } + + if (string.Equals(dataType.ToUpperInvariant(), "FLOAT", StringComparison.OrdinalIgnoreCase)) + { + return null; + } + + if (string.Equals(dataType.ToUpperInvariant(), "DATETIME2", StringComparison.OrdinalIgnoreCase) || + string.Equals(dataType.ToUpperInvariant(), "DATETIMEOFFSET", StringComparison.OrdinalIgnoreCase) || + string.Equals(dataType.ToUpperInvariant(), "TIME", StringComparison.OrdinalIgnoreCase)) + { + return $"{dateTimePrecisione}"; + } + + return null; + } + + protected override void CheckIfUserInterestedColumnsCanBeManaged() + { + var checkIfUserInterestedColumnsCanBeManaged = this.GetTableColumnsList().ToArray(); + foreach (var tableColumn in checkIfUserInterestedColumnsCanBeManaged) + { + if (string.Equals(tableColumn.Type.ToUpperInvariant(), "XML", StringComparison.OrdinalIgnoreCase) || + string.Equals(tableColumn.Type.ToUpperInvariant(), "IMAGE", StringComparison.OrdinalIgnoreCase) || + string.Equals(tableColumn.Type.ToUpperInvariant(), "TEXT", StringComparison.OrdinalIgnoreCase) || + string.Equals(tableColumn.Type.ToUpperInvariant(), "NTEXT", StringComparison.OrdinalIgnoreCase) || + string.Equals(tableColumn.Type.ToUpperInvariant(), "STRUCTURED", StringComparison.OrdinalIgnoreCase) || + string.Equals(tableColumn.Type.ToUpperInvariant(), "GEOGRAPHY", StringComparison.OrdinalIgnoreCase) || + string.Equals(tableColumn.Type.ToUpperInvariant(), "GEOMETRY", StringComparison.OrdinalIgnoreCase) || + string.Equals(tableColumn.Type.ToUpperInvariant(), "HIERARCHYID", StringComparison.OrdinalIgnoreCase) || + string.Equals(tableColumn.Type.ToUpperInvariant(), "SQL_VARIANT", StringComparison.OrdinalIgnoreCase)) + { + throw new ColumnTypeNotSupportedException($"{tableColumn.Type} column type is not an supported by SqlTableDependency."); + } + } + } + + protected virtual string ConvertFormat(TableColumnInfo userInterestedColumn) + { + return string.Equals(userInterestedColumn.Type, "datetime", StringComparison.OrdinalIgnoreCase) || string.Equals(userInterestedColumn.Type, "date", StringComparison.OrdinalIgnoreCase) ? ", 121" : string.Empty; + } + + protected virtual string ConvertValueByType(IReadOnlyCollection userInterestedColumns, TableColumnInfo userInterestedColumn, bool isOld = false) + { + var oldNameExtension = isOld ? "_old" : string.Empty; + + if (string.Equals(userInterestedColumn.Type, "binary", StringComparison.OrdinalIgnoreCase) || string.Equals(userInterestedColumn.Type, "varbinary", StringComparison.OrdinalIgnoreCase) || string.Equals(userInterestedColumn.Type, "timestamp", StringComparison.OrdinalIgnoreCase)) + { + return this.SanitizeVariableName(userInterestedColumns, userInterestedColumn.Name) + oldNameExtension; + } + + if (userInterestedColumn.Type.ToLower() == "float") + { + return $"CONVERT(NVARCHAR(MAX), RTRIM(LTRIM(STR({this.SanitizeVariableName(userInterestedColumns, userInterestedColumn.Name)}{oldNameExtension}{this.ConvertFormat(userInterestedColumn)}, 53, 16))))"; + } + + return $"CONVERT(NVARCHAR(MAX), {this.SanitizeVariableName(userInterestedColumns, userInterestedColumn.Name)}{oldNameExtension}{this.ConvertFormat(userInterestedColumn)})"; + } + + protected virtual string PrepareSendConversation(ChangeType dmlType, IReadOnlyCollection userInterestedColumns) + { + var sendList = userInterestedColumns + .Select(interestedColumn => + { + var sendStatement = this.Spacer(16) + $"IF {this.SanitizeVariableName(userInterestedColumns, interestedColumn.Name)} IS NOT NULL BEGIN" + Environment.NewLine + this.Spacer(20) + $";SEND ON CONVERSATION '{this.ConversationHandle}' MESSAGE TYPE [{_dataBaseObjectsNamingConvention}/{interestedColumn.Name}] ({this.ConvertValueByType(userInterestedColumns, interestedColumn)})" + Environment.NewLine + this.Spacer(16) + "END" + Environment.NewLine + this.Spacer(16) + "ELSE BEGIN" + Environment.NewLine + this.Spacer(20) + $";SEND ON CONVERSATION '{this.ConversationHandle}' MESSAGE TYPE [{_dataBaseObjectsNamingConvention}/{interestedColumn.Name}] (0x)" + Environment.NewLine + this.Spacer(16) + "END"; + if (this.IncludeOldValues) + { + sendStatement += Environment.NewLine + this.Spacer(16) + $"IF {this.SanitizeVariableName(userInterestedColumns, interestedColumn.Name)}_old IS NOT NULL BEGIN" + Environment.NewLine + this.Spacer(20) + $";SEND ON CONVERSATION '{this.ConversationHandle}' MESSAGE TYPE [{_dataBaseObjectsNamingConvention}/{interestedColumn.Name}/old] ({this.ConvertValueByType(userInterestedColumns, interestedColumn, this.IncludeOldValues)})" + Environment.NewLine + this.Spacer(16) + "END" + Environment.NewLine + this.Spacer(16) + "ELSE BEGIN" + Environment.NewLine + this.Spacer(20) + $";SEND ON CONVERSATION '{this.ConversationHandle}' MESSAGE TYPE [{_dataBaseObjectsNamingConvention}/{interestedColumn.Name}/old] (0x)" + Environment.NewLine + this.Spacer(16) + "END"; + } + + return sendStatement; + }) + .ToList(); + + sendList.Insert(0, $";SEND ON CONVERSATION '{this.ConversationHandle}' MESSAGE TYPE [{string.Format(StartMessageTemplate, _dataBaseObjectsNamingConvention, dmlType)}] (CONVERT(NVARCHAR, @dmlType))" + Environment.NewLine); + sendList.Add(Environment.NewLine + this.Spacer(16) + $";SEND ON CONVERSATION '{this.ConversationHandle}' MESSAGE TYPE [{string.Format(EndMessageTemplate, _dataBaseObjectsNamingConvention)}] (0x)"); + + return string.Join(Environment.NewLine, sendList); + } + + protected virtual string PrepareSelectForSetVariables(IReadOnlyCollection userInterestedColumns) + { + var result = string.Join(", ", userInterestedColumns.Select(interestedColumn => $"{this.SanitizeVariableName(userInterestedColumns, interestedColumn.Name)} = [{interestedColumn.Name}]")); + if (this.IncludeOldValues) result += ", " + string.Join(", ", userInterestedColumns.Select(interestedColumn => $"{this.SanitizeVariableName(userInterestedColumns, interestedColumn.Name)}_old = [{interestedColumn.Name}_old]")); + + return result; + } + + protected virtual string PrepareDeclareVariableStatement(IReadOnlyCollection interestedColumns) + { + var columnsList = (from interestedColumn in interestedColumns + let variableType = $"{interestedColumn.Type.ToLowerInvariant()}" + (string.IsNullOrWhiteSpace(interestedColumn.Size) + ? string.Empty + : $"({interestedColumn.Size})") + select this.DeclareStatement(interestedColumns, interestedColumn, variableType)).ToList(); + + return string.Join(Environment.NewLine + this.Spacer(4), columnsList); + } + + protected virtual string DeclareStatement(IReadOnlyCollection interestedColumns, TableColumnInfo interestedColumn, string variableType) + { + var variableName = this.SanitizeVariableName(interestedColumns, interestedColumn.Name); + + var declare = $"DECLARE {variableName} {variableType.ToLowerInvariant()}"; + if (this.IncludeOldValues) declare += $", {variableName}_old {variableType.ToLowerInvariant()}"; + + return declare; + } + + protected virtual string SanitizeVariableName(IReadOnlyCollection userInterestedColumns, string tableColumnName) + { + for (var i = 0; i < userInterestedColumns.Count; i++) + { + if (userInterestedColumns.ElementAt(i).Name == tableColumnName) + { + return "@var" + (i + 1); + } + } + + throw new SanitizeVariableNameException(tableColumnName); + } + + protected override void CheckIfConnectionStringIsValid() + { + if (string.IsNullOrWhiteSpace(_connectionString)) throw new ArgumentNullException(nameof(_connectionString)); + + SqlConnectionStringBuilder sqlConnectionStringBuilder; + + try + { + sqlConnectionStringBuilder = new SqlConnectionStringBuilder(_connectionString); + } + catch (Exception exception) + { + throw new InvalidConnectionStringException(_connectionString, exception); + } + + using (var sqlConnection = new SqlConnection(sqlConnectionStringBuilder.ConnectionString)) + { + try + { + sqlConnection.Open(); + } + catch (SqlException exception) + { + throw new ImpossibleOpenSqlConnectionException(sqlConnectionStringBuilder.ConnectionString, exception); + } + } + } + + protected override void CheckIfUserHasPermissions() + { + PrivilegesTable privilegesTable; + + using (var sqlConnection = new SqlConnection(_connectionString)) + { + sqlConnection.Open(); + using (var sqlCommand = sqlConnection.CreateCommand()) + { + sqlCommand.CommandText = SqlScripts.SelectUserGrants; + + var rows = SerializeSqlDataReader.Serialize(sqlCommand.ExecuteReader(CommandBehavior.CloseConnection)); + privilegesTable = PrivilegesTable.FromEnumerable(rows); + } + } + + if (privilegesTable.Rows.Count == 0) throw new UserWithNoPermissionException(); + + if (privilegesTable.Rows.Any(r => string.Equals(r.Role, "db_owner", StringComparison.OrdinalIgnoreCase))) + { + // Ok + } + else + { + foreach (var permission in Enum.GetValues(typeof(SqlServerRequiredPermission))) + { + var permissionToCheck = EnumUtil.GetDescriptionFromEnumValue((SqlServerRequiredPermission)permission); + if (privilegesTable.Rows.All(r => !string.Equals(r.PermissionType, permissionToCheck, StringComparison.OrdinalIgnoreCase))) + { + throw new UserWithMissingPermissionException(permissionToCheck); + } + } + } + } + + protected virtual void CheckIfServiceBrokerIsEnabled() + { + using (var sqlConnection = new SqlConnection(_connectionString)) + { + sqlConnection.Open(); + using (var sqlCommand = sqlConnection.CreateCommand()) + { + sqlCommand.CommandText = "SELECT is_broker_enabled FROM sys.databases WITH (NOLOCK) WHERE database_id = db_id();"; + if ((bool)sqlCommand.ExecuteScalar() == false) throw new ServiceBrokerNotEnabledException(); + } + } + } + + protected override void CheckIfTableExists() + { + using (var sqlConnection = new SqlConnection(_connectionString)) + { + sqlConnection.Open(); + using (var sqlCommand = sqlConnection.CreateCommand()) + { + sqlCommand.CommandText = string.Format(SqlScripts.InformationSchemaTables, _tableName, _schemaName); + if ((int)sqlCommand.ExecuteScalar() == 0) throw new NotExistingTableException(_tableName); + } + } + } + + protected virtual async Task WaitForNotifications( + CancellationToken cancellationToken, + Delegate[] onChangeSubscribedList, + Delegate[] onErrorSubscribedList, + Delegate[] onStatusChangedSubscribedList, + int timeOut, + int timeOutWatchDog) + { + this.WriteTraceMessage(TraceLevel.Verbose, "Get in WaitForNotifications."); + + var messagesBag = this.CreateMessagesBag(this.Encoding, _processableMessages); + var messageNumber = this.GetTableColumnsList().Count() * (this.IncludeOldValues ? 2 : 1) + 2; + + var waitForSqlScript = + $"BEGIN CONVERSATION TIMER ('{this.ConversationHandle.ToString().ToUpper()}') TIMEOUT = " + timeOutWatchDog + ";" + + $"WAITFOR (RECEIVE TOP({messageNumber}) [message_type_name], [message_body] FROM [{_schemaName}].[{_dataBaseObjectsNamingConvention}_Receiver]), TIMEOUT {timeOut * 1000};"; + + this.NotifyListenersAboutStatus(onStatusChangedSubscribedList, TableDependencyStatus.Started); + + try + { + using (var sqlConnection = new SqlConnection(_connectionString)) + { + await sqlConnection.OpenAsync(cancellationToken); + this.WriteTraceMessage(TraceLevel.Verbose, "Connection opened."); + this.NotifyListenersAboutStatus(onStatusChangedSubscribedList, TableDependencyStatus.WaitingForNotification); + + while (true) + { + messagesBag.Reset(); + + using (var sqlCommand = new SqlCommand(waitForSqlScript, sqlConnection)) + { + sqlCommand.CommandTimeout = 0; + this.WriteTraceMessage(TraceLevel.Verbose, "Executing WAITFOR command."); + + using (var sqlDataReader = await sqlCommand.ExecuteReaderAsync(cancellationToken).WithCancellation(cancellationToken)) + { + while (sqlDataReader.Read()) + { + var message = new Message(sqlDataReader.GetSqlString(0).Value, sqlDataReader.IsDBNull(1) ? null : sqlDataReader.GetSqlBytes(1).Value); + if (message.MessageType == SqlMessageTypes.ErrorType) throw new QueueContainingErrorMessageException(); + messagesBag.AddMessage(message); + this.WriteTraceMessage(TraceLevel.Verbose, $"Received message type = {message.MessageType}."); + } + } + } + + if (messagesBag.Status == MessagesBagStatus.Collecting) + { + throw new MessageMisalignedException("Received a number of messages lower than expected."); + } + + if (messagesBag.Status == MessagesBagStatus.Ready) + { + this.WriteTraceMessage(TraceLevel.Verbose, "Message ready to be notified."); + this.NotifyListenersAboutChange(onChangeSubscribedList, messagesBag); + this.WriteTraceMessage(TraceLevel.Verbose, "Message notified."); + } + } + } + } + catch (OperationCanceledException) + { + this.NotifyListenersAboutStatus(onStatusChangedSubscribedList, TableDependencyStatus.StopDueToCancellation); + this.WriteTraceMessage(TraceLevel.Info, "Operation canceled."); + } + catch (AggregateException aggregateException) + { + this.NotifyListenersAboutStatus(onStatusChangedSubscribedList, TableDependencyStatus.StopDueToError); + if (cancellationToken.IsCancellationRequested == false) this.NotifyListenersAboutError(onErrorSubscribedList, aggregateException.InnerException); + this.WriteTraceMessage(TraceLevel.Error, "Exception in WaitForNotifications.", aggregateException.InnerException); + } + catch (SqlException sqlException) + { + this.NotifyListenersAboutStatus(onStatusChangedSubscribedList, TableDependencyStatus.StopDueToError); + if (cancellationToken.IsCancellationRequested == false) this.NotifyListenersAboutError(onErrorSubscribedList, sqlException); + this.WriteTraceMessage(TraceLevel.Error, "Exception in WaitForNotifications.", sqlException); + } + catch (Exception exception) + { + this.NotifyListenersAboutStatus(onStatusChangedSubscribedList, TableDependencyStatus.StopDueToError); + if (cancellationToken.IsCancellationRequested == false) this.NotifyListenersAboutError(onErrorSubscribedList, exception); + this.WriteTraceMessage(TraceLevel.Error, "Exception in WaitForNotifications.", exception); + } + finally + { + this.DropDatabaseObjects(); + } + } + } + + #endregion Protected virtual methods +} \ No newline at end of file