Daily Archives: March 10, 2014
migration C#, Java, C++ (day 8), TCP, Qt
The last two days were characterized by incompatibilities. There is no TCP support in the C++ standard libraries. I knew why I tried to avoid Boost and Qt. Visual Studio 2013 has its problems with Boost. You get really weird error messages that cannot be fixed at all. Why should Visual Studio actively support Boost? They have their own libraries, which are pretty cool.
I was fighting with several compilers and tools that generated roughly 50 GB of library data in total. Finally I gave up as it is really as mad as it is described on some web pages. I ended up installing the Qt Creator and had to rewrite the entire C++ code. I have to say that Qt is structured in a much better way than Boost. Overall the C++ coding took roughly 20 times longer than the C# coding, which was completed within 20 minutes. Most of that time was due to the incompatibility fight.
C++ executes faster, but let’s take a company’s point of view. If someone invests 250,000 USD in extra production time, wouldn’t it be smarter to buy 10x faster computers instead?
In the stock market, there are many traders telling you that it is all about speed. Not really! Data is coming in a sequence. I do know many situations where you have to slow down the system to solve specific sequence problems. Most market participant aren’t even aware of this.
Today I compare two TCP chat programs. C# is straight forward. The C++ side looks a bit weird, because the Qt Q_OBJECT macro requires header files. I did not want to split up the code into many pieces, so I kept the code short and I also kept it in the header sections.
ADDENDUM 21 May 2014
The Java code has been added today. The code structure is better than the C# example. The short Java test program deals with 1 server and 2 clients.
Below the Java code you can find a second C# example that uses the improved object approach similar to the Java code.
TCP
using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.IO; using System.Net; using System.Net.Sockets; using System.Threading; namespace DemoApp.ToCpp { public class Day8 { public class NetworkListener { private volatile bool _ExitLoop = true; private TcpListener _Listener; public int Port { get; private set; } public string IpAddress { get; private set; } public string ThreadName { get; private set; } private List<Client> _AllClients = new List<Client>(); public NetworkListener(string xIpAddress, int xPort, string xThreadName) { Port = xPort; IpAddress = xIpAddress; ThreadName = xThreadName; } // private class Client { private TcpClient _TcpClient; private NetworkStream _NetworkStream; public delegate void dOnShutDown(Client xClient); public event dOnShutDown OnShutDown; private volatile bool _ExitLoop = false; public readonly string IpAddress; public Client(TcpClient xTcpClient) { _TcpClient = xTcpClient; IpAddress = xTcpClient.Client.LocalEndPoint.ToString(); Console.WriteLine("SERVER MESSAGE -> " + IpAddress + " new client connected."); } // public void Disconnect() { _ExitLoop = true; dOnShutDown lEvent = OnShutDown; if (lEvent != null) lEvent(this); if (_NetworkStream != null) _NetworkStream.WriteTimeout = 5; // immediate timeout } // public void ListenAsync() { Thread lThread = new Thread(new ThreadStart(Listen)); lThread.IsBackground = true; lThread.Name = "Client_" + IpAddress; lThread.Start(); } // private void Listen() { using (_NetworkStream = _TcpClient.GetStream()) { using (StreamReader lStreamReader = new StreamReader(_NetworkStream)) { while (!_ExitLoop) { try { string s = lStreamReader.ReadLine(); if (s == null) break; Console.WriteLine("SERVER MESSAGE -> " + IpAddress + " Client said: " + s); } catch (IOException) { if (_ExitLoop) Console.WriteLine("SERVER MESSAGE -> user requested TcpClient shutdown"); else Console.WriteLine("SERVER MESSAGE -> disconnected"); } catch (Exception ex) { Console.WriteLine(ex.Message); } } } Console.WriteLine("SERVER MESSAGE -> " + IpAddress + " Listener is shutting down"); } _NetworkStream = null; } // } // class public bool WaitForClientsAsync() { if (!_ExitLoop) { Console.WriteLine("SERVER MESSAGE -> Listener running already"); return false; } _ExitLoop = false; try { _Listener = new TcpListener(IPAddress.Parse(IpAddress), Port); _Listener.Start(); Thread lThread = new Thread(new ThreadStart(WaitForAClient)); lThread.IsBackground = true; lThread.Name = ThreadName; lThread.Start(); return true; } catch (Exception ex) { Console.WriteLine(ex.Message); } return false; } // public void Disconnect() { _ExitLoop = true; List<Client> lClone; // we need a clone, because the _AllClients collection will be manipulated during the foreach loop lock (_AllClients) { lClone = new List<Client>(_AllClients); } foreach (Client lClient in lClone) lClient.Disconnect(); _Listener.Stop(); } // private void WaitForAClient() { try { while (!_ExitLoop) { int lClientCount; lock (_AllClients) lClientCount = _AllClients.Count; Console.WriteLine("SERVER MESSAGE -> Waiting for client number " + lClientCount); TcpClient lTcpClient = _Listener.AcceptTcpClient(); Client lClient = new Client(lTcpClient); lClient.OnShutDown += OnClientShutDown; lock (_AllClients) _AllClients.Add(lClient); lClient.ListenAsync(); // starts a background thread } } catch (Exception ex) { Console.WriteLine(ex.Message); } } void OnClientShutDown(NetworkListener.Client xClient) { lock (_AllClients) { _AllClients.Remove(xClient); } } // } // class public class NetworkClient { public int Port { get; private set; } public string IpAddress { get; private set; } public string ThreadName { get; private set; } private NetworkStream _NetworkStream = null; private bool _ExitLoop = true; private BlockingCollection<string> _Queue = new BlockingCollection<string>(); public NetworkClient(string xIpAddress, int xPort, string xThreadName) { Port = xPort; IpAddress = xIpAddress; ThreadName = xThreadName; } // public void ConnectAsync() { if (!_ExitLoop) return; // running already _ExitLoop = false; Thread lThread = new Thread(new ThreadStart(Loop)); lThread.IsBackground = true; lThread.Name = ThreadName; lThread.Start(); } // public void Disconnect() { _ExitLoop = true; _Queue.Add(null); if (_NetworkStream != null) _NetworkStream.ReadTimeout = 100; } // public void Send(string xText) { if (string.IsNullOrEmpty(xText)) return; _Queue.Add(xText); // thread safety by context switching } // private void Loop() { try { using (TcpClient lClient = new TcpClient()) { lClient.Connect(IpAddress, Port); using (_NetworkStream = lClient.GetStream()) { using (StreamWriter lStreamWriter = new StreamWriter(_NetworkStream)) { while (!_ExitLoop) { try { string s = _Queue.Take(); if (string.IsNullOrEmpty(s)) break; lStreamWriter.WriteLine(s); } catch (System.IO.IOException) { if (_ExitLoop) Console.WriteLine("CLIENT -> User requested shutdown."); else Console.WriteLine("CLIENT -> disconnected"); } catch (Exception ex) { Console.WriteLine(ex.Message); } } } _ExitLoop = true; Console.WriteLine(Environment.NewLine + "CLIENT -> shutting down"); } } } catch (Exception ex) { Console.WriteLine(ex.Message); } } // } // class public static void Test() { NetworkListener lServer = new NetworkListener("127.0.0.1", 65432, "Server"); NetworkClient lClient = new NetworkClient("127.0.0.1", 65432, "Client"); lServer.WaitForClientsAsync(); lClient.ConnectAsync(); lClient.Send("Hello World!"); // send a text message across the network System.Threading.Thread.Sleep(1000); lClient.Disconnect(); lServer.Disconnect(); Console.ReadLine(); } // } // class } // namespace
example output:
SERVER MESSAGE -> Waiting for client number 0
SERVER MESSAGE -> 127.0.0.1:65432 new client connected.
SERVER MESSAGE -> Waiting for client number 1
SERVER MESSAGE -> 127.0.0.1:65432 Client said: Hello World!CLIENT -> shutting down
SERVER MESSAGE -> 127.0.0.1:65432 Listener is shutting down
A blocking operation was interrupted by a call to WSACancelBlockingCall
#pragma once #include <iostream> #include <QtNetwork> #include <QObject> #include <QString> #include <QTcpSocket> #include <string> using namespace std; class Client : QObject { Q_OBJECT private: int _Port = 0; QString _IpAddress; QTcpSocket _TcpSocket; private slots: void Connected() { cout << "CLIENT MESSAGE -> connection established " << _IpAddress.toStdString() << ":" << _Port << endl; } public: inline int getPort() { return _Port; } inline QString getIpAddress() { return _IpAddress; } Client(QString xIpAddress, int xPort) : QObject() { _IpAddress = xIpAddress; _Port = xPort; } ~Client() { cout << "deconstructing Client class " << _IpAddress.toStdString() << endl; } Client(QObject* lParent): QObject(lParent) { cout << "Client :)" << endl; } void Connect() { connect(&_TcpSocket, SIGNAL(connected()), this, SLOT(Connected())); QHostAddress lHostAddress(_IpAddress); _TcpSocket.connectToHost(lHostAddress, _Port); while (!_TcpSocket.waitForConnected( 2000 )) { cout << "CLIENT MESSAGE -> waiting for a feedback" << endl; } } void Disconnect() { _TcpSocket.close(); } // void Send(const string &xText) { if (xText == "") return; const char *lText = xText.c_str(); _TcpSocket.write(lText, xText.length() + 1); } // }; //---------------------------------------------------------------------------------------------------------------------- #pragma once #include <QTcpSocket> #include <QHostAddress> #include <thread> #include <set> #include <iostream> #include <QtNetwork> using namespace std; class Connection : QObject { Q_OBJECT private: QTcpSocket *_TcpSocket; string _IpAddress; void Read() { try { char lBuffer[1024] = {0}; _TcpSocket->read(lBuffer, _TcpSocket->bytesAvailable()); cout << "SERVER MESSAGE -> " << _IpAddress << " Client said: " << lBuffer << endl; } catch (exception &ex) { cout << ex.what() << endl; } } // public: const string &IpAddress() const { return _IpAddress; } void Disconnect() { _TcpSocket->close(); delete _TcpSocket; } // Connection(QTcpSocket *xTcpSocket) : _TcpSocket(xTcpSocket) { QHostAddress lAddress = xTcpSocket->localAddress(); QString s = lAddress.toString(); _IpAddress = s.toLocal8Bit().constData(); connect(xTcpSocket, SIGNAL(readyRead()), this, SLOT(Read())); // setup event cout << "SERVER MESSAGE -> " + _IpAddress << " new client connected." << endl; } // Connection(QObject* lParent): QObject(lParent) { cout << "Connection :)"; } ~Connection() { Disconnect(); cout << "deconstructing Connection class " << _IpAddress << endl; } }; // class //---------------------------------------------------------------------------------------------------------------------- #pragma once #include <iostream> #include <QtNetwork> #include <QObject> #include <QString> #include <QTcpSocket> #include <QTcpServer> #include <string> #include "Connection.cpp" #include <mutex> using namespace std; class Server : QObject { Q_OBJECT private: list<Connection*> _AllClients; mutex mutex_AllClients; int _Port; string _IpAddress; QTcpServer _TcpServer; void OnClientShutDown(Connection *xClient) { cout << "SERVER MESSAGE -> Connection shutting down" << endl; mutex_AllClients.lock(); _AllClients.remove(xClient); mutex_AllClients.unlock(); xClient->Disconnect(); } // private slots: void NewConnection() { cout << "SERVER MESSAGE -> new connection" << endl; QTcpSocket *lSocket = _TcpServer.nextPendingConnection(); Connection *lClient = new Connection(lSocket); mutex_AllClients.lock(); _AllClients.push_back(lClient); mutex_AllClients.unlock(); } // public: Server(QObject* lParent): QObject(lParent) { cout << "Server :)"; } ~Server() { cout << "deconstructing Server class " << endl; } inline int getPort() { return _Port; } inline string getIpAddress() { return _IpAddress; } Server(string xIpAddress, int xPort) : QObject() { _Port = xPort; _IpAddress = xIpAddress; connect(&_TcpServer, SIGNAL(newConnection()), this, SLOT(NewConnection())); // setup event QString lIpAddress(xIpAddress.c_str()); //QHostAddress lHostAddress(lIpAddress); if (_TcpServer.listen(QHostAddress::Any, xPort)) { //if(_TcpServer.listen(lHostAddress, xPort)) { cout << "SERVER MESSAGE -> listener up and running" << endl; } } // void Disconnect() { mutex_AllClients.lock(); list<Connection*> lClone; list<Connection*>::const_iterator lIterator; for (lIterator = _AllClients.begin(); lIterator != _AllClients.end(); ++lIterator) { lClone.push_back(*lIterator); // we need a clone, because the _AllClients collection will be manipulated during the foreach loop } mutex_AllClients.unlock(); for (Connection *lClient : lClone) lClient->Disconnect(); _TcpServer.close(); } // }; // class //---------------------------------------------------------------------------------------------------------------------- #include <QApplication> #include <iostream> #include <thread> #include "Server.h" #include "Client.h" int main(int argc, char** argv) { QApplication app(argc, argv); try { Server lServer("127.0.0.1", 65432); Client lClient("127.0.0.1", 65432); lClient.Connect(); this_thread::sleep_for(chrono::milliseconds(1000)) ; lClient.Send("Hello World!"); // send a text message across the network this_thread::sleep_for(chrono::milliseconds(1000)) ; lClient.Disconnect(); lServer.Disconnect(); cout << "press enter to exit" << endl; cin.get(); } catch (exception &ex) { cerr << ex.what() << endl; } return app.exec(); } //
package DemoApp; import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; import java.util.ArrayList; public final class Server extends Thread { private final ArrayList<Client> _Clients = new ArrayList<>(); public final int port; ServerSocket _ServerSocket; public Server(String xName, int xPort) throws IOException { super(xName); port = xPort; setDaemon(true); _ServerSocket = new ServerSocket(xPort); } // constructor public void close() throws IOException { this.interrupt(); // exit loop synchronized (_Clients) { for (Client lClient : _Clients) lClient.close(); _Clients.clear(); } _ServerSocket.close(); System.out.println(_ServerSocket.getLocalSocketAddress().toString() + " server closed down"); } // public void broadcast(String xMessage) { synchronized (_Clients) { for (Client lClient : _Clients) lClient.getWriter().send(xMessage); } } // @Override public void run() { System.out.println(_ServerSocket.getLocalSocketAddress().toString() + " server is waiting for clients"); try { while (true) { Socket lSocket = _ServerSocket.accept(); // wait for a client to connect Client lClient = new Client(lSocket); synchronized (_Clients) { _Clients.add(lClient); } lClient.connect(); } } catch (IOException ex) { //System.out.println(ex.getMessage()); } } // } // class // ----------------------------------------------------------------------------------------------------------------------- package DemoApp; import java.io.IOException; import java.net.Socket; public class Client { private Socket _Socket = null; public final String IpAddressHost; public final String IpAddressLocal; public final String IpConnectionString; private Reader _Reader = null; private Writer _Writer = null; public Socket getSocket() { return _Socket; } // public Reader getReader() { return _Reader; } // public Writer getWriter() { return _Writer; } // public void close() throws IOException { synchronized (this) { if (_Reader != null) _Reader.close(true); _Reader = null; if (_Writer != null) _Writer.close(true); _Writer = null; } System.out.println(IpConnectionString + " client closed down"); } // public Client(Socket xSocket) { _Socket = xSocket; IpAddressHost = xSocket.getInetAddress().getHostAddress() + ":" + _Socket.getPort(); IpAddressLocal = xSocket.getLocalAddress().getHostAddress() + ":" + _Socket.getLocalPort(); IpConnectionString = "(Local " + IpAddressLocal + ", Host " + IpAddressHost + ")"; } // public void connect() throws IOException { _Reader = new Reader(this); _Writer = new Writer(this); _Reader.start(); // start listening _Writer.start(); // start write loop } // } // class // ----------------------------------------------------------------------------------------------------------------------- package DemoApp; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.Socket; public class Reader extends Thread { private final Client _Client; private BufferedReader _StreamReader; public Reader(Client xClient) throws IOException { _Client = xClient; Socket lSocket = xClient.getSocket(); _StreamReader = new BufferedReader(new InputStreamReader(lSocket.getInputStream())); } // constructor public void close(boolean xExitLoop) throws IOException { synchronized (this) { if (xExitLoop) interrupt(); if (_StreamReader == null) return; _StreamReader.close(); _StreamReader = null; } System.out.println(_Client.IpConnectionString + " closed reader connection"); } // @Override public void run() { System.out.println(_Client.IpConnectionString + " start new reader connection"); while (true) try { String lInput = _StreamReader.readLine(); if (lInput == null) break; if (lInput.isEmpty()) continue; System.out.println(_Client.IpConnectionString + " message received: " + lInput); if ("EXIT".equals(lInput)) break; } catch (IOException | RuntimeException ex) { System.out.println(ex); } try { close(false); } catch (IOException ex) { } } // } // class // ----------------------------------------------------------------------------------------------------------------------- package DemoApp; import java.io.IOException; import java.io.PrintWriter; import java.net.Socket; import java.util.LinkedList; public class Writer extends Thread { private final Client _Client; private PrintWriter _PrintWriter; private final LinkedList<String> _WriteQueue = new LinkedList<>(); public Writer(Client xClient) throws IOException { setDaemon(true); _Client = xClient; Socket lSocket = xClient.getSocket(); _PrintWriter = new PrintWriter(lSocket.getOutputStream(), true); } // constructor public void close(boolean xExitLoop) throws IOException { synchronized (this) { if (xExitLoop) interrupt(); if (_PrintWriter == null) return; _PrintWriter.close(); _PrintWriter = null; } System.out.println(_Client.IpConnectionString + " closed writer connection"); } // public void send(String xText) { if (xText == null) return; synchronized (_WriteQueue) { _WriteQueue.addLast(xText); _WriteQueue.notify(); } } // @Override public void run() { System.out.println(_Client.IpConnectionString + " start new writer connection"); try { while (true) { String lText = null; synchronized (_WriteQueue) { if (_WriteQueue.size() > 0) lText = _WriteQueue.removeFirst(); else _WriteQueue.wait(); } if (lText == null) continue; if (_PrintWriter == null) continue; _PrintWriter.println(lText); } } catch (InterruptedException ex) { if (ex.getMessage() != null) System.out.println(ex.getMessage()); } try { close(false); } catch (IOException ex) { } } // } // class // ----------------------------------------------------------------------------------------------------------------------- package DemoApp; import java.io.IOException; import java.net.Socket; public class Test { private static void Sleep() { try { Thread.sleep(2000); } catch (InterruptedException ex) { System.out.println(ex.getMessage()); } } // public static void main(String[] args) throws IOException { Server lServer = new Server("MyServer", 65432); lServer.start(); Sleep(); Socket lClientSocket1 = new Socket("localhost", 65432); Client lClient1 = new Client(lClientSocket1); lClient1.connect(); Sleep(); Socket lClientSocket2 = new Socket("localhost", 65432); Client lClient2 = new Client(lClientSocket2); lClient2.connect(); Sleep(); lClient1.getWriter().send("client->server: hello server!"); Sleep(); lServer.broadcast("server->client: hello client!"); Sleep(); lClient1.getWriter().send("EXIT"); lClient1.close(); lClient2.getWriter().send("EXIT"); lClient2.close(); Sleep(); lServer.close(); //System.in.read(); Sleep(); } } // class
example output:
0.0.0.0/0.0.0.0:65432 server is waiting for clients
(Local 127.0.0.1:65432, Host 127.0.0.1:54059) start new reader connection
(Local 127.0.0.1:54059, Host 127.0.0.1:65432) start new reader connection
(Local 127.0.0.1:65432, Host 127.0.0.1:54059) start new writer connection
(Local 127.0.0.1:54059, Host 127.0.0.1:65432) start new writer connection
(Local 127.0.0.1:54060, Host 127.0.0.1:65432) start new reader connection
(Local 127.0.0.1:54060, Host 127.0.0.1:65432) start new writer connection
(Local 127.0.0.1:65432, Host 127.0.0.1:54060) start new reader connection
(Local 127.0.0.1:65432, Host 127.0.0.1:54060) start new writer connection
(Local 127.0.0.1:65432, Host 127.0.0.1:54059) message received: client->server: hello server!
(Local 127.0.0.1:54059, Host 127.0.0.1:65432) message received: server->client: hello client!
(Local 127.0.0.1:54060, Host 127.0.0.1:65432) message received: server->client: hello client!
(Local 127.0.0.1:65432, Host 127.0.0.1:54059) message received: EXIT
(Local 127.0.0.1:65432, Host 127.0.0.1:54059) closed reader connection
(Local 127.0.0.1:54059, Host 127.0.0.1:65432) closed reader connection
(Local 127.0.0.1:54059, Host 127.0.0.1:65432) closed writer connection
(Local 127.0.0.1:54059, Host 127.0.0.1:65432) client closed down
(Local 127.0.0.1:65432, Host 127.0.0.1:54060) message received: EXIT
(Local 127.0.0.1:65432, Host 127.0.0.1:54060) closed reader connection
(Local 127.0.0.1:54060, Host 127.0.0.1:65432) closed reader connection
(Local 127.0.0.1:54060, Host 127.0.0.1:65432) closed writer connection
(Local 127.0.0.1:54060, Host 127.0.0.1:65432) client closed down
(Local 127.0.0.1:65432, Host 127.0.0.1:54059) closed writer connection
(Local 127.0.0.1:65432, Host 127.0.0.1:54059) client closed down
(Local 127.0.0.1:65432, Host 127.0.0.1:54060) closed writer connection
(Local 127.0.0.1:65432, Host 127.0.0.1:54060) client closed down
0.0.0.0/0.0.0.0:65432 server closed down
using System; using System.Collections.Generic; using System.Net; using System.Net.Sockets; using System.Threading; namespace Tcp { public class Server { private readonly List<Client> _Clients = new List<Client>(); public readonly int Port; private TcpListener _TcpListener = null; private Thread _Thread; private readonly string Name; public Server(String xName, int xPort) { Name = xName; Port = xPort; } // constructor public bool start() { try { _TcpListener = new TcpListener(IPAddress.Parse("127.0.0.1"), Port); _TcpListener.Start(); _Thread = new Thread(loop); _Thread.IsBackground = true; _Thread.Name = Name; _Thread.Start(); } catch (Exception ex) { Console.WriteLine(ex.StackTrace + Environment.NewLine + ex.Message); return false; } return true; } // public void Close() { if (_Thread != null) { _Thread.Interrupt(); _Thread = null; } lock (_Clients) { foreach (Client lClient in _Clients) lClient.close(); _Clients.Clear(); } _TcpListener.Stop(); Console.WriteLine(_TcpListener.LocalEndpoint.ToString() + " server closed down"); _TcpListener = null; } // public void broadcast(String xMessage) { lock (_Clients) { foreach (Client lClient in _Clients) lClient.Writer.Send(xMessage); } } // private void loop() { Console.WriteLine(_TcpListener.LocalEndpoint.ToString() + " server is waiting for clients"); try { while (true) { int lClientCount; lock (_Clients) lClientCount = _Clients.Count; TcpClient lTcpClient = _TcpListener.AcceptTcpClient(); // wait for a client to connect Client lClient = new Client(lTcpClient); lock (_Clients) _Clients.Add(lClient); lClient.Connect(); } } catch (ThreadInterruptedException) { } // Thread interruption catch (SocketException) { } // Thread interruption => SocketException catch (Exception ex) { Console.WriteLine(ex.StackTrace + Environment.NewLine + ex.Message); } } // } // class } // namespace // ----------------------------------------------------------------------------------------------------------------------- using System; using System.Net.Sockets; namespace Tcp { public class Client { public TcpClient TcpClient { get; private set; } public readonly String IpAddressHost; public readonly String IpAddressLocal; public readonly String IpConnectionString; public Reader Reader { get; private set; } public Writer Writer { get; private set; } public void close() { lock (this) { if (Reader != null) Reader.Close(true); Reader = null; if (Writer != null) Writer.Close(true); Writer = null; if (TcpClient != null) { TcpClient.Close(); Console.WriteLine(IpConnectionString + " TcpClient read/write closed"); } TcpClient = null; } Console.WriteLine(IpConnectionString + " client closed down"); } // public Client(string xHostname, int xPort) : this(new TcpClient(xHostname, xPort)) { } // create and connect public Client(TcpClient xTcpClient) { TcpClient = xTcpClient; IpAddressHost = TcpClient.Client.RemoteEndPoint.ToString(); IpAddressLocal = TcpClient.Client.LocalEndPoint.ToString(); IpConnectionString = "(Local " + IpAddressLocal + ", Host " + IpAddressHost + ")"; } // public void Connect() { Reader = new Reader(this); Writer = new Writer(this); Reader.Start(); // start listening Writer.Start(); // start write loop } // } // class } // namespace // ----------------------------------------------------------------------------------------------------------------------- using System; using System.IO; using System.Net.Sockets; using System.Threading; namespace Tcp { public class Reader { private readonly Client _Client; private StreamReader _StreamReader = null; private Thread _Thread = null; public Reader(Client xClient) { _Client = xClient; NetworkStream lNetworkStream = xClient.TcpClient.GetStream(); _StreamReader = new StreamReader(lNetworkStream); } // constructor public void Start() { _Thread = new Thread(loop); _Thread.IsBackground = true; _Thread.Start(); } // public void Close(bool xExitLoop) { lock (this) { if (xExitLoop && (_Thread != null)) { _Thread.Interrupt(); _Thread = null; } // _Client.TcpClient.Close() in Client object //if (_StreamReader == null) return; //_StreamReader.Close(); //_StreamReader = null; } //Console.WriteLine(_Client.IpConnectionString + " closed reader connection"); } // private void loop() { Console.WriteLine(_Client.IpConnectionString + " start new reader connection"); while (true) { try { String lInput = _StreamReader.ReadLine(); if (lInput == null) break; if (lInput == "") continue; Console.WriteLine(_Client.IpConnectionString + " message received: " + lInput); if ("EXIT".Equals(lInput)) break; } catch (ThreadInterruptedException) { break; } // Thread interruption catch (IOException) { break; } // StreamReader disposed catch (Exception ex) { Console.WriteLine(ex.StackTrace + Environment.NewLine + ex.Message); } } Close(false); } // } // class } // namespace // ----------------------------------------------------------------------------------------------------------------------- using System; using System.Collections.Concurrent; using System.IO; using System.Net.Sockets; using System.Threading; namespace Tcp { public class Writer { private readonly Client _Client; private StreamWriter _StreamWriter = null; private Thread _Thread = null; private readonly BlockingCollection<String> _WriteQueue = new BlockingCollection<string>(); public Writer(Client xClient) { _Client = xClient; NetworkStream lNetworkStream = xClient.TcpClient.GetStream(); _StreamWriter = new StreamWriter(lNetworkStream); } // constructor public void Start() { _Thread = new Thread(Loop); _Thread.IsBackground = true; _Thread.Start(); } // public void Close(bool xExitLoop) { lock (this) { if (xExitLoop && (_Thread != null)) { _Thread.Interrupt(); _Thread = null; } // _Client.TcpClient.Close() in Client object //if (_StreamWriter == null) return; //_StreamWriter.Close(); //_StreamWriter = null; } //Console.WriteLine(_Client.IpConnectionString + " closed writer connection"); } // public void Send(string xText) { if (string.IsNullOrEmpty(xText)) return; _WriteQueue.Add(xText); } // private void Loop() { Console.WriteLine(_Client.IpConnectionString + " start new writer connection"); while (true) { try { string lText = null; lText = _WriteQueue.Take(); if (string.IsNullOrEmpty(lText)) continue; if (_StreamWriter == null) continue; _StreamWriter.WriteLine(lText); _StreamWriter.Flush(); } catch (ObjectDisposedException) { break; } catch (ThreadInterruptedException) { break; } // Thread interruption catch (Exception ex) { Console.WriteLine(ex.StackTrace + Environment.NewLine + ex.Message); } } Close(false); } // } // class } // namespace // ----------------------------------------------------------------------------------------------------------------------- using System.Net.Sockets; using System.Threading; namespace Tcp { class Program { static void Main(string[] args) { Server lServer = new Server("MyServer", 65432); lServer.start(); Thread.Sleep(2000); TcpClient lClientSocket1 = new TcpClient("localhost", 65432); Client lClient1 = new Client(lClientSocket1); lClient1.Connect(); Thread.Sleep(2000); TcpClient lClientSocket2 = new TcpClient("localhost", 65432); Client lClient2 = new Client(lClientSocket2); lClient2.Connect(); Thread.Sleep(2000); lClient1.Writer.Send("client->server: hello server!"); Thread.Sleep(2000); lServer.broadcast("server->client: hello client!"); Thread.Sleep(2000); lClient1.Writer.Send("EXIT"); Thread.Sleep(2000); lClient1.close(); lClient2.Writer.Send("EXIT"); Thread.Sleep(2000); lClient2.close(); Thread.Sleep(1000); lServer.Close(); //Console.ReadLine(); Thread.Sleep(5000); } // main } // class } // namespace
example output:
127.0.0.1:65432 server is waiting for clients
(Local 127.0.0.1:65432, Host 127.0.0.1:58161) start new reader connection
(Local 127.0.0.1:58161, Host 127.0.0.1:65432) start new reader connection
(Local 127.0.0.1:58161, Host 127.0.0.1:65432) start new writer connection
(Local 127.0.0.1:65432, Host 127.0.0.1:58161) start new writer connection
(Local 127.0.0.1:65432, Host 127.0.0.1:58163) start new reader connection
(Local 127.0.0.1:58163, Host 127.0.0.1:65432) start new reader connection
(Local 127.0.0.1:65432, Host 127.0.0.1:58163) start new writer connection
(Local 127.0.0.1:58163, Host 127.0.0.1:65432) start new writer connection
(Local 127.0.0.1:65432, Host 127.0.0.1:58161) message received: client->server: hello server!
(Local 127.0.0.1:58161, Host 127.0.0.1:65432) message received: server->client: hello client!
(Local 127.0.0.1:58163, Host 127.0.0.1:65432) message received: server->client: hello client!
(Local 127.0.0.1:65432, Host 127.0.0.1:58161) message received: EXIT
(Local 127.0.0.1:58161, Host 127.0.0.1:65432) TcpClient read/write closed
(Local 127.0.0.1:58161, Host 127.0.0.1:65432) client closed down
(Local 127.0.0.1:65432, Host 127.0.0.1:58163) message received: EXIT
(Local 127.0.0.1:58163, Host 127.0.0.1:65432) TcpClient read/write closed
(Local 127.0.0.1:58163, Host 127.0.0.1:65432) client closed down
(Local 127.0.0.1:65432, Host 127.0.0.1:58161) TcpClient read/write closed
(Local 127.0.0.1:65432, Host 127.0.0.1:58161) client closed down
(Local 127.0.0.1:65432, Host 127.0.0.1:58163) TcpClient read/write closed
(Local 127.0.0.1:65432, Host 127.0.0.1:58163) client closed down
127.0.0.1:65432 server closed down