Setting up a TCP/IP connection

In this case we are using a TCP/IP connection to get XML feeds from a remote server.

public class FeedProvider : IFeedProvider, IDisposable
{
    private readonly IDataFeedConfig _config;
    private volatile bool _readingThreadStop;
 
    public event EventHandler<Message> Notify;
    public event EventHandler<FeedType2> FeedReceived;
 
    public FeedProvider(IDataFeedConfig config)
    {
        _config = config;
    }
 
    public void StartReceivingFeeds()
    {
        Notify(this, new Message(MessageType.MSG_FEED_PROVIDER_START_RECEIVING_FEEDS));
        var readingThread = new Thread(HandleReadingThread);
        _readingThreadStop = false;
        readingThread.Start();
    }
 
    public void StopReceivingFeeds()
    {
        Notify(this, new Message(MessageType.MSG_FEED_PROVIDER_STOP_RECEIVING_FEEDS));
        Dispose();
    }
 
    #region Private Methods
 
    private void HandleReadingThread()
    {
        const int UNKNOWN_ERR_MAX_COUNT = 10;
        const int TIMEOUT_DURATION = 10000;
 
        while (true)
        {
            lock (this)
            {
                if (_readingThreadStop) break;
            }
 
            var unknownErrCount = 0;
            string readStr = null;
            using (var client = new TcpClient())
            {
                try
                {
                    //Start the connection
                    Notify(this, new Message("Connecting to the Server"));
                    client.Client.SetSocketOption(SocketOptionLevel.Socket,
                        SocketOptionName.KeepAlive, true);
                    client.Connect(_config.Mf_HostName, _config.Mf_HostPort);
 
                    //The connection is open
                    Notify(this, new Message(MessageType.MSG_FEED_PROVIDER_CONNECTION_OPEN));
                    using (NetworkStream stream = client.GetStream())
                    {
                        stream.ReadTimeout = TIMEOUT_DURATION;
                        stream.WriteTimeout = TIMEOUT_DURATION;
                        using (var sr = new StreamReader(stream))
                        {
                            using (var sw = new StreamWriter(stream))
                            {
                                while (client.Connected)
                                {
                                    lock (this)
                                    {
                                        if (_readingThreadStop) break;
                                    }
 
                                    try
                                    {
                                        readStr = sr.ReadLine();
                                        XDocument doc = XDocument.Parse(readStr);
                                        IEnumerable<XDocument> response = ManageReceivedDoc(doc);
                                        foreach (XDocument responseDoc in response)
                                        {
                                            Notify(this, new Message(string.Format("Sending XML: {0}", responseDoc)));
                                            sw.WriteLine(responseDoc.ToString(SaveOptions.DisableFormatting));
                                            sw.Flush();
                                        }
                                        unknownErrCount = 0;
                                    }
 
                                    #region Error Management
 
                                    catch (XmlException)
                                    {
                                        Notify(this, new Message(string.Format("Received Invalid XML: {0}", readStr)));
                                    }
                                    catch (Exception e)
                                    {
                                        unknownErrCount++;
                                        Notify(this, new Message(string.Format("Unknown Error ({0} of {1}): {2}", unknownErrCount,
                                            UNKNOWN_ERR_MAX_COUNT, e.Message)));
                                        if (unknownErrCount == UNKNOWN_ERR_MAX_COUNT)
                                        {
                                            Notify(this, new Message("Too Many Unknown Errors. The Connection will be Closed"));
                                            break;
                                        }
                                    }
                                    #endregion
                                }
                            }
                        }
                    }
                }
 
                #region Error Management
 
                catch (SocketException e)
                {
                    Notify(this, new Message(string.Format("Cannot access to Server: {0}", e.Message)));
                }
                catch (Exception e)
                {
                    Notify(this, new Message(string.Format("Unknown Error: {0}", e.Message)));
                }
                #endregion
 
                if (client.Connected) client.Close();
                Notify(this, new Message(MessageType.MSG_FEED_PROVIDER_CONNECTION_CLOSED));
            }
 
            //Wait before reconnect
            Thread.Sleep(_config.ReconnectFrequency);
        }
        Notify(this, new Message("Thread Closed"));
    }

    private IEnumerable<XDocument> ManageReceivedDoc(XDocument doc)
    {
        var response = new List<XDocument>();
        switch (doc.Root.Name.LocalName)
        {
            case "FeedType1":
                var feedType1 = new FeedType1(doc);
                Notify(this, new Message(string.Format("Received FeedType1: {0}", feedType1)));
		
                //For this feed is provided a response
                AddSubscriptionsToResponse(response, feedType1);
                break;
 
            case "FeedType2":
                var feedType2 = new FeedType2(doc);
                Notify(this, new Message(string.Format("Received FeedType2: {0}", feedType2)));
                FeedReceived(this, feedType2 );
                break;
 
            default:
                Notify(this, new Message(string.Format("Received Unrecognized XML: {0}", doc)));
                break;
        }
        return response;
    }
 
    private void AddSubscriptionsToResponse(List<XDocument> response,
        FeedType1 feedType1)
    {
		response.Add(new ResponseMessage(feedType1).Doc);
    }
    #endregion
 
    public void Dispose()
    {
        lock (this)
        {
            _readingThreadStop = true;
        }
    }
}
Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s