Board index » delphi » Re: Sockets

Re: Sockets


2005-09-29 05:51:05 PM
delphi242
Quote
>>Time is critical as the client will timeout should no data be
>>received, this led me to believe that a threaded app would be best.
>>
>
>Hmm.. this is a bit messy, but if those are the requirements, I guess
>you're stuck with it :)
>
>So, you need a TCP server, to which the concentrator will make one
>connection and feed in a stream of data that the server will have to
>'distribute' to Client-Handling Threads. These CHT's will be able to
>make blocking calls etc. without affecting the other CHT's, so
>preserving quick responses for them.
>
>Is this it? Do I see it correctly?


The concentrator will fire off packets to the TCP Server in a random
order. ie It does not wait for one client to finish data transfer
before starting the next client. However a single TCP packet will not
have data from more then one client.
It's in this area that I start to feel a bit uneasy. There must be, at some
point in the link, some absolute, uninterrrupted serialisation of the data,
ie:
signal: 'this is the start of some data from client 1'
data: data bytes from client 1
signal: 'this is the end of the data from client 1'
If this is not done, a server that makes a read call and receives just one
byte will have no way of determining which client the byte belongs to.
I guess your protocol wraps the packets to ensure that, when stream data is
received, it is always known to which client the data belongs, even if only
one byte is received in a single read call, yes?
Quote
See I suppose the problem is that the final bit of data from the client
is a total transaction and I have to make sure all the individuals add
up to that total before sending back an ack. eg This could be the order
of data coming to me.

C1,C2,C3 Clients 1 2 and 3 sending individual Transactions.
T1,T2,T3 Clients 1 2 and 3 sending Totals


START
C1->C1->T1->C2->C3->C3->C2->C2->T2->T3
END

Ok, let's assume that your server makes read calls from the concentrator and
can always determine, even if it receives only a single byte in the call,
which client it came from.
At some point, a CHT will have to be assigned, and, later, be released.
Even with 'classic' TCP, this can sometimes be a problem because servers
and/or clients can become unreachable without any notification. With your
concentrator/protocol, there must be some sort of 'connect' notification
that signals that a client has connected and has been assigned some token to
uniquely identify it until such time as it 'disconnects'. This client token
could be stored in a list/array/something and have a queue associated with
it. A CHT would wait on the queue. All data reads from the concentrator
can then have their token looked up in the list and the data buffer queued
to the correct CHT.
How am I doing so far?
Rgds,
Martin
 
 

Re: Sockets

Martin James writes:
Quote
It's in this area that I start to feel a bit uneasy. There must be,
at some point in the link, some absolute, uninterrrupted
serialisation of the data, ie:

signal: 'this is the start of some data from client 1'
data: data bytes from client 1
signal: 'this is the end of the data from client 1'

If this is not done, a server that makes a read call and receives
just one byte will have no way of determining which client the byte
belongs to.

I guess your protocol wraps the packets to ensure that, when stream
data is received, it is always known to which client the data
belongs, even if only one byte is received in a single read call, yes?
The data comes from an X25 link which has a CLNP header. To get the
data
to me a Cisco router encapsulates the whole thing in TCP and passes to
an IP.
To get data back to the the client all I have to do is swap source and
destination addresses in the CLNP header and pass back to router.
However this is dynamic so I cannot say that this source address will
always be that client, that is where the actual data comes into it.
Inside the data contains an ID that I can reference.
So technically I never have a connection with the client... ie
SERVER(ME) ->TCP->CISCO ROUTER->X25......->TERMINAL(CLIENT)
Quote
Ok, let's assume that your server makes read calls from the
concentrator and can always determine, even if it receives only a
single byte in the call, which client it came from.

At some point, a CHT will have to be assigned, and, later, be
released. Even with 'classic' TCP, this can sometimes be a problem
because servers and/or clients can become unreachable without any
notification. With your concentrator/protocol, there must be some
sort of 'connect' notification that signals that a client has
connected and has been assigned some token to uniquely identify it
until such time as it 'disconnects'. This client token could be
stored in a list/array/something and have a queue associated with it.
A CHT would wait on the queue. All data reads from the concentrator
can then have their token looked up in the list and the data buffer
queued to the correct CHT.

How am I doing so far?
Doing well :)
I have no direct method of telling when the Terminal(Client) times
out/hangs up. What happens is that the router sends a FIN packet once
it times out. (which could be 5 seconds after the terminal hangs up)
That is why I thought threads would be best because I cant have the
program waiting to process data because it is waiting for other client
to timeout.
Regards
Jeremy
Quote

Rgds,
Martin
 

Re: Sockets

Quote


The data comes from an X25 link which has a CLNP header. To get the
data
to me a Cisco router encapsulates the whole thing in TCP and passes to
an IP.
Ahh...OK.
Quote
To get data back to the the client all I have to do is swap source and
destination addresses in the CLNP header and pass back to router.
However this is dynamic so I cannot say that this source address will
always be that client, that is where the actual data comes into it.
Inside the data contains an ID that I can reference.
OK, cookin' now:)
Quote
So technically I never have a connection with the client... ie


SERVER(ME) ->TCP->CISCO ROUTER->X25......->TERMINAL(CLIENT)



>Ok, let's assume that your server makes read calls from the
>concentrator and can always determine, even if it receives only a
>single byte in the call, which client it came from.
>
>At some point, a CHT will have to be assigned, and, later, be
>released. Even with 'classic' TCP, this can sometimes be a problem
>because servers and/or clients can become unreachable without any
>notification. With your concentrator/protocol, there must be some
>sort of 'connect' notification that signals that a client has
>connected and has been assigned some token to uniquely identify it
>until such time as it 'disconnects'. This client token could be
>stored in a list/array/something and have a queue associated with it.
>A CHT would wait on the queue. All data reads from the concentrator
>can then have their token looked up in the list and the data buffer
>queued to the correct CHT.
>
>How am I doing so far?


Doing well :)

I have no direct method of telling when the Terminal(Client) times
out/hangs up. What happens is that the router sends a FIN packet once
it times out. (which could be 5 seconds after the terminal hangs up)
One thing - in TCP-terms, rather than system-terms, is the router a sever or
client? I'd guess a client, ie. your 'server' app has to initially
connect to the router and identify itself as a link for data streams from
the X25 port, (perhaps just by connecting to a specific port on the router),
yes?
So, your 'SERVER' is a TCP-client of the router, but a system server of the
terminals, yes?
Quote
That is why I thought threads would be best because I cant have the
program waiting to process data because it is waiting for other client
to timeout.
OK, I don't pretent to understand all the details of this multi-protocol
link, but there should be no problem using a CHT per client and giving each
CHT it is own timeout on it is input queue. This would allow each virtual
circuit to have, effectively, its own timeout.
Is this what you are looking for?
So, the SERVER has a TCP-client that connects to the router X25-link port
and waits for TCP data. As data comes in, it parses the bytes to extract
CLNP frames. Each frame header contains a souce address and destination
address. After the header is more data containing an ID, (integer?) that
uniquely identifies the client plus actual 'application' data from the
client terminal identified by the ID.
How is a 'new client' signalled? I suppose that an ID that is not presently
contained in the SERVER list would indicate that a new terminal has logged
on, yes?
So maybee the SERVER needs some things like this:
-- TproducerConsumerQueue
A class for inter-thread comms - something that threads can wait on
efficiently for objects. I cannot see how any non-trivial, reliable,
threaded system can be made without one. There are a few P-C queue classes
around. I have posted a 'TsemaphoreQueue' class several times. That would
do. The queue should have a timeout on it is 'pop' calls. If you need it, I
can post it again. it is only a few lines.
--- TCLNPframe
A data carrier for each frame. You may turn over a lot of these frames, so
you may want to pool them. It may be a good idea to use this class for a
bit more than just always acting as a data buffer, so I suggest adding a
'command' enumeration field, eg:
*************************************************
EftFrameType=(EftRxClientData,EftTxToRouter,EftClientTimedOut,EftTerminate);
TCLNPframe=class
FmyClientID:integer;
FdataBuffer:array[0..maxDataSize-1] of byte;
FdataLen:integer;
public
command:EftFrameType;
property myClientID:integer read FclientID;
property dataLen:integer read Fdatalen;
property dataAddr:pointer read getDataStartAddr;
procedure initialize;
function addByte(thisByte:byte):boolean;
procedure reverseSourceAndDestination;
constructor create(forThisClientID:integer;);
end;
*************************************************
The 'addByte' method should be a CLNP byte-by-byte parser, (presumably a
state-machine), returning true if a complete, valid frame has been
assembled. If it detects a protocol error, it could raise an exception, so
perhaps allowing the caller to dump everything received so far by calling
'initialize' and so starting again.
--- TrouterConnectionThread, or TSERVER
This is the primary class of the server - it should create everything else,
directly or indirectly. it is a thread to run a TCP client to connect to the
router, (perhaps with a retry-on-disconnect loop), that assembles complete
TCLNPframe instances and queues them to the TframeDispatcher thread.
Perhaps something like this:
*************************************************
TSERVER=class(TThread)
private
FrouterAddr:string;
FrouterPort:string;
dispatcherInputQueue:TproducerConsumerQueue;
routerConnection:TtcpClient; // Indy TidTCPClient, maybee
procedure sendToRouter(sender:TObject);
protected
procedure execute; override;
public
constructor create(routerAddr:string,routerPort:string);
end;
TSERVER.create(routerAddr:string,routerPort:string);
begin
inherited create(true);
FrouterAddr:=routerAddr;
FrouterPort:=routerPort;
routerConnection:=TtcpClient.create;
dispatcherInputQueue:=TproducerConsumerQueue.create;
TframeDispatcherThread.create(dispatcherInputQueue,
sendToRouter);
resume;
end;
procedure TSERVER.sendToRouter(sender:TObject);
var frameToSend:TCLNPframe;
begin
frameToSend:=TCLNPframe(sender);
routerConnection.sendBuffer(frameToSend.dataAddr,frameToSend,dataLen);
end;
procedure TSERVER.execute;
procedure openConnectionToRouter:boolean;
begin
try
routerConnection.connect(FrouterAddr,FrouterPort);
result:=true;
except
result:=false;
end;
end;
procedure handleConnection;
var thisCLNPframe:TCLNPframe;
dataBuffer:array[0..CbuffLen-1] of byte;
dataLen,dataIndex, clientIndex:integer;
begin
thisCLNPframe:=TCLNPframe.create(0); // not known yet!
try
dataLen:=routerConnection.read(@dataBuffer,CbuffLen);
if (dataLen=0) then exit;
for dataIndex:=0 to dataLen-1 do
begin
if thisCLNPframe.addByte(dataBuffer[dataIndex]) then
begin
dispatcherInputQueue.push(thisCLNPframe);
thisCLNPframe:=TCLNPframe.create(0); // not known yet!
end;
end;
finally
end;
end;
begin
currentClients:=TvirtualClients.create;
repeat;
while not openConnectionToRouter do sleep(5000);
handleConnection;
until terminated;
end;
*************************************************
--TvirtualClient
A class that represents the interface to a single client-handler, perhaps
something like this:
*************************************************
TvirtualClient=class
FclientID:integer;
CHT:TclientHandlerThread;
public
property myClientID:integer read FclientID;
procedure sendFrameToHandler(thisFrame:TCLNPframe);
constructor
create(withThisID:integer;dispatcherQueue:TproducerConsumerQueue);
destructor destroy; override;
end;
procedure TvirtualClient.sendFrameToHandler(thisFrame:TCLNPframe);
begin
CHT.queueFrame(thisFrame);
end;
constructor TvirtualClient.create(withThisID:integer;
dispatcherQueue:TproducerConsumerQueue);
begin
FclientID:=withThisID;
CHT:=TclientHandlerThread.create(dispatcherQueue,withThisID);
end;
destructor TvirtualClient.destroy;
var suicideRequstFrame:TCLNPframe;
begin
suicideRequstFrame:=TCLNPframe.create(FclientID);
suicideRequstFrame.command:=EftTerminate;
CHT.queueFrame(suicideRequstFrame);
inherited destroy;
end;
*************************************************
--TVirtualClients
A container class for 'live' TvirtualClient instances, perhaps something
like this:
*************************************************
TvirtualClients=class(TObjectList);
private
FdispatcherInputQueue:TproducerConsumerQueue;
public
function findClient(thisFrame:TCLNPframe):integer;
procedure dispatchFrame(thisFrame:TCLNPframe, index:integer);
function addClient(withThisID:integer):integer;
procedure deleteClient(clientToRemove:index);
constructor create(dispatcherInputQueue:TproducerConsumerQueue);
end;
constructor create(dispatcherInputQueue:TproducerConsumerQueue);
begin
inherited create;
ownsObjects:=false;
FdispatcherInputQueue:=dispatcherInputQueue;
end;
procedure TvirtualClients.findClient(thisFrame:TCLNPframe):integer;
var itemIndex:integer;
thisClient:TvirtualClient;
begin
result:=-1;
for itemIndex:=0 to count-1 do
begin
thisClient:=TvirtualClient(items[itemIndex]);
if (thisClient.myClientID=thisFrame.myClientID) then
begin
result:=itemIndex;
exit;
end;
end;
end;
procedure TvirtualClients.dispatchFrame(thisFrame:TCLNPframe,
index:integer);
var thisVirtualClient:TvirtualClient;
begin
thisVirtualClient:=TvirtualClient(items[index]);
thisVirtualClient.sendFrameToHandler(thisFrame);
end;
procedure TvirtualClients.addClient(withThisID:integer):integer;
var newVirtualClient:TvirtualClient;
begin
newVirtualClient:=TvirtualClient.create(withThisID,FdispatcherInputQueue);
add(newVirtualClient);
end;
procedure TvirtualClients.deleteClient(clientToRemove:index);
var deletedVirtualClient::TvirtualClient;
begin
deletedVirtualClient:=TvirtualClient(items[index]);
delete(index);
deletedVirtualClient.free;
end;
*************************************************
--TframeDispatcherThread
A thread that waits for TCLNPframe instances containing terminal data from
the TrouterConnectionThread and any other TCLNPframe instances that may come
in with management commands. Why can the TrouterConnectionThread not
dispatch the frames directly? Why have a seperate dispatcher thread?
Specifically to allow management frames to be fed in. A thread that
performs management for all clients must not get stuck on blocking calls for
any extended period. Many socket components do not easily allow a thread to
wait on both a socket *and* a P-C queue, so the TrouterConnectionThread will
probably be stuck on a blocking socket read and cannot accept data from
anywhere else while this goes on. It might look something like this:
*************************************************
TframeDispatcherThread=class(TThread)
private
FinputQueue:TproducerConsumerQueue;
liveClients:TvirtualClients;
FsendMethod:TNotifyEvent;
protected
procedure execute; override;
public
constructor create(inputQueue:TproducerConsumerQueue;
sendToRouter:TNotifyEvent);
end;
procedure TframeDispatcherThread.create(inputQueue:TproducerConsumerQueue;
sendToRouter:TNotifyEvent);
begin
inherited create(true);
FinputQueue:=inputQueue;
FsendMethod:=sendToRouter;
freeOnTerminate:=true;
liveClients:=TvirtualClients.create(inputQueue);
resume;
end;
procedure TframeDispatcherThread.execute;
var thisCLNPFrame:TCLNPframe;
procedure dispatchFrameToClient;
var clientIndex:integer;
begin
clientIndex:=currentClients.findClient(thisCLNPframe);
if (clientIndex=-1) then clientIndex:=
currentClients.addClient(thisCLNPframe.myClientID);
currentClients.dispatchFrame(thisCLNPframe,clientIndex);
end;
procedure sendToRouter;
begin
try
FsendMethod(thisCLNPFrame);
finally
thisCLNPFrame.free;
end;
end;
procedure timeOutClient;
var clientIndex:integer;
begin
try
clientIndex:=currentClients.findClient(thisCLNPframe);
if (clientIndex<>-1) then
currentClients.deleteClient(clientIndex);
finally
thisCLNPFrame.free;
end;
end;
begin
try
while FinputQueue.pop(@thisCLNPFrame,INFINITE) do
begin
case thisCLNPFrame.command of
EftRxClientData:dispatchFrameToClient;
EftTxToRouter:FsendMethod();
EftClientTimedOut:timeOutClient;
EftTerminate:exit;
end;
end;
finally
end;
end;
*************************************************
--TclientHandlerThread
A per-client thread class for processing data, sending data back and timing
out the virtual client/circuit.
*************************************************
TclientHandlerThread=class(TThread)
private
inputQueue:TproducerConsumerQueue;
FdispatcherQueue:TproducerConsumerQueue;
FclientID:integer;
protected
procedure execute; override;
public
procedure queueFrame(inFrame:TCLNPframe);
constructor
create(dispatcherQueue:TproducerConsumerQueue;clientID:integer);
end;
procedure TclientHandlerThread.create
(dispatcherQueue:TproducerConsumerQueue;clientID:integer);
begin
inherited create(true);
FclientID:=clientID;
FdispatcherQueue:=dispatcherQueue;
inputQueue:=TproducerConsumerQueue.create;
freeOnTerminate:=true;
resume;
end;
procedure TclientHandlerThread.queueFrame(inFrame:TCLNPframe);
begin
inputQueue.push(inFrame);
end;
procedure TclientHandlerThread.execute;
var thisFrame:TCLNPframe;
begin
try
repeat
if inputQueue.pop(@thisFrame,CclientTimeout) do
begin
processClientFrame;
if (needToSendSomething) then
begin
thisFrame.initialize;
copySendDataIntoTheFrame;
thisFrame.reverseSourceAndDestination;
thisFrame.command:=EftTxToRouter;
FoutputQueue.push(thisFrame);
end
else
thisFrame.free;
end
else
begin // no frame received - timed out
thisFrame:=TCLNPframe.create(FclientID);
thisFrame.command:=EftClientTimedOut;
FoutputQueue.push(thisFrame);
end;
until false;
finally
inputQueue.free;
end;
end;
*************************************************
Obviously, there is much left out - exception handling, checking for 'FIN'
and removing client, the actual CLNP protocol parser, data handling etc.
Rgds,
Martin
 

Re: Sockets

Martin James writes:
Quote

One thing - in TCP-terms, rather than system-terms, is the router a
sever or client? I'd guess a client, ie. your 'server' app has
to initially connect to the router and identify itself as a link for
data streams from the X25 port, (perhaps just by connecting to a
specific port on the router), yes?

So, your 'SERVER' is a TCP-client of the router, but a system server
of the terminals, yes?
Yes my server just sits there and listens. The router really does all
the work.
Quote

>That is why I thought threads would be best because I cant have the
>program waiting to process data because it is waiting for other
>client to timeout.

OK, I don't pretent to understand all the details of this
multi-protocol link, but there should be no problem using a CHT per
client and giving each CHT it is own timeout on it is input queue.
This would allow each virtual circuit to have, effectively, its own
timeout.

Is this what you are looking for?
Yes that would be great.
Quote

So, the SERVER has a TCP-client that connects to the router X25-link
port and waits for TCP data. As data comes in, it parses the bytes
to extract CLNP frames. Each frame header contains a souce address
and destination address. After the header is more data containing an
ID, (integer?) that uniquely identifies the client plus actual
'application' data from the client terminal identified by the ID.

How is a 'new client' signalled? I suppose that an ID that is not
presently contained in the SERVER list would indicate that a new
terminal has logged on, yes?
No new terminal will ever be able to logon. Once I set them up in the
user list then they will be able to logon.
Quote

So maybee the SERVER needs some things like this:
...
...
....
Quote
*************************************************

Obviously, there is much left out - exception handling, checking for
'FIN' and removing client, the actual CLNP protocol parser, data
handling etc.
WOW, 10 points for this one Martin. The code has given me a base for
the threaded side which had me stumped.
I however used a packed record for my CLNP frame as I could just
type TCLNP = packed record
FNetwork : Byte;
FLength : Byte;
FVersion : Byte;
FLifeTime : Byte;
FType : Byte;
FSegment : array[0..1] of Byte;
FChecksum : array[0..1] of Byte;
FDestinationLength : Byte;
FDestination : array[0..4] of Byte;
FSourceLength : Byte;
FSource : array[0..4] of Byte;
end;
And then when I want to fill the frame
move(Source[0],CLNP,SizeOf(CLNP);
Once again thanks for your help.
Regards
Jeremy
 

Re: Sockets

Quote


WOW, 10 points for this one Martin. The code has given me a base for
the threaded side which had me stumped.
Well as long as it is no taken *too* literally. There are probably
inconsistencies and errors in the classes, but you can sort that out :)
IMHO, the main 'tricks' with these sort of threaded designs is:
1) Never hard-wait to output to another thread - communicate via P-C queues
where the queue is locked with a CriticalSection for only the very short
time taken for push/pop. If threads only wait for input, at one point, it
is 'very difficult' to get any deadlocks at all.
2) Serialise all management actions in one thread that does not block,
(except on it is input queue). This reduces avoidable interactions between
threads, makes debugging easier and avoids those {*word*193} 'lock the mangement
info directly with a CS and access it from many threads' schemes.
Quote
I however used a packed record for my CLNP frame as I could just

type TCLNP = packed record
FNetwork : Byte;
FLength : Byte;
FVersion : Byte;
FLifeTime : Byte;
FType : Byte;
FSegment : array[0..1] of Byte;
FChecksum : array[0..1] of Byte;
FDestinationLength : Byte;
FDestination : array[0..4] of Byte;
FSourceLength : Byte;
FSource : array[0..4] of Byte;
end;

..whatever <g>. I'd still be tempted to embed this packed record in a
class, just to provide methods and add additional fields/properties.
I don't know that much about CLNP, but you do, so no problem there.
Quote
Once again thanks for your help.

Appreciated :)
Rgds,
Martin