/* ** BUFFERED TRANSPORT WRITER STREAM ** ** (c) COPYRIGHT MIT 1995. ** Please first read the full copyright statement in the file COPYRIGH. ** @(#) $Id: HTBufWrt.c,v 2.21 1998/05/19 16:49:22 frystyk Exp $ ** ** A buffered output stream. This stream lets you write characters to a ** stream without causing a write every time. The data is first written ** into a buffer. Data is written to the actual stream only when the ** buffer is full, or when the stream is flushed. */ /* Library include files */ #include "wwwsys.h" #include "WWWUtil.h" #include "WWWCore.h" #include "HTNetMan.h" #include "HTWriter.h" #include "HTTimer.h" #include "HTBufWrt.h" /* Implemented here */ struct _HTOutputStream { const HTOutputStreamClass * isa; HTOutputStream * target; /* Target for outgoing data */ HTHost * host; int allocated; /* Allocated Buffer size */ int growby; int expo; char * read; /* Position in 'data' */ char * data; /* buffer */ ms_t lastFlushTime; /* polar coordinates of the moon */ HTTimer * timer; }; #define PUTBLOCK(b,l) (*me->target->isa->put_block)(me->target,(b),(l)) /* ------------------------------------------------------------------------- */ /* ** This function is only called from either FlushEvent or HTBufferWriter_lazyFlush ** which means that only the host object or timeout can cause a flush */ PRIVATE int HTBufferWriter_flush (HTOutputStream * me) { int status = HT_OK; if (me && me->read > me->data) { me->lastFlushTime = HTGetTimeInMillis(); if ((status = PUTBLOCK(me->data, me->read - me->data))==HT_WOULD_BLOCK) return HT_WOULD_BLOCK; me->read = me->data; } return status; } PRIVATE int FlushEvent (HTTimer * timer, void * param, HTEventType type) { HTOutputStream * me = (HTOutputStream *) param; if (timer != me->timer) HTDebugBreak(__FILE__, __LINE__, "Buffer Writer timer %p not in sync\n", timer); if (PROT_TRACE) HTTrace("Buffer...... Timeout flushing %p with timer %p\n", me, timer); /* ** We ignore the return code here which we shouldn't!!! */ HTBufferWriter_flush(me); /* ** Delete the timer */ me->timer = NULL; return HT_OK; } PRIVATE int HTBufferWriter_lazyFlush (HTOutputStream * me) { HTNet * net; int delay; if (me->read <= me->data) { return HT_OK; /* nothing to flush */ } /* ** If we are allowed to delay the flush then set a timer with the ** delay descibed by our delay variable. If we can't delay then flush ** right away. */ delay = HTHost_findWriteDelay(me->host, me->lastFlushTime, me->read - me->data); /* ** Flush immediately */ if (!delay) { int status; if (STREAM_TRACE) HTTrace("Buffer...... Flushing %p\n", me); if ((status = HTBufferWriter_flush(me)) && me->timer) { HTTimer_delete(me->timer); me->timer = NULL; } return status; } /* ** Set a timer and tell the host we've done the write if ** we have not already started a timer earlier. If a timer ** does already exist then make sure that it hasn't expired. ** This can be the case if we have a really slow client that ** can't parse the data fast enough. */ if (!me->timer) { net = HTHost_getWriteNet(me->host); me->timer = HTTimer_new(NULL, FlushEvent, me, delay, YES, NO); HTHost_unregister(me->host, net, HTEvent_WRITE); if (STREAM_TRACE) HTTrace("Buffer...... Waiting %dms on %p\n", delay, me); } else { if (HTTimer_hasTimerExpired(me->timer)) { if (STREAM_TRACE) HTTrace("Buffer...... Dispatching old timer %p\n", me->timer); HTTimer_dispatch(me->timer); me->timer = NULL; } else { if (STREAM_TRACE) HTTrace("Buffer...... Waiting on unexpired timer %p\n", me->timer); } } return HT_OK; } PRIVATE int HTBufferWriter_free (HTOutputStream * me) { return HTBufferWriter_lazyFlush(me); } PRIVATE BOOL HTBufferWriter_addBuffer(HTOutputStream * me, int addthis) { if (me) { me->allocated += (addthis - addthis%me->growby + (me->growby*me->expo)); me->expo *= 2; if (STREAM_TRACE) HTTrace("Buffer...... Increasing buffer to %d bytes\n", me->allocated); if (me->data) { int size = me->read-me->data; if ((me->data = (char *) HT_REALLOC(me->data, me->allocated)) == NULL) HT_OUTOFMEM("HTBufferWriter_addBuffer"); me->read = me->data + size; } else { if ((me->data = (char *) HT_CALLOC(1, me->allocated)) == NULL) HT_OUTOFMEM("HTBufferWriter_addBuffer"); me->read = me->data; } return YES; } return NO; } PRIVATE int HTBufferWriter_abort (HTOutputStream * me, HTList * e) { if (STREAM_TRACE) HTTrace("Buffer...... ABORTING...\n"); if (me->target) (*me->target->isa->abort)(me->target, e); if (me->timer) { HTTimer_delete(me->timer); me->timer = NULL; } return HT_ERROR; } PRIVATE int HTBufferWriter_write (HTOutputStream * me, const char * buf, int len) { HTNet * net = HTHost_getWriteNet(me->host); int status; while (1) { int available = me->data + me->allocated - me->read; /* If we have enough buffer space */ if (len <= available) { int size = 0; memcpy(me->read, buf, len); me->read += len; /* If we have accumulated enough data then flush */ if ((size = me->read - me->data) > me->growby) { me->lastFlushTime = HTGetTimeInMillis(); status = PUTBLOCK(me->data, size); if (status == HT_OK) { HTNet_addBytesWritten(net, size); me->read = me->data; } else { return (status == HT_WOULD_BLOCK) ? HT_OK : HT_ERROR; } } return HT_OK; } else { /* Fill the existing buffer (if not already) and flush */ if (available) { memcpy(me->read, buf, available); buf += available; len -= available; me->read += available; } me->lastFlushTime = HTGetTimeInMillis(); status = PUTBLOCK(me->data, me->allocated); if (status == HT_OK) { HTNet_addBytesWritten(net, me->allocated); me->read = me->data; } else if (status == HT_WOULD_BLOCK) { HTBufferWriter_addBuffer(me, len); memcpy(me->read, buf, len); me->read += len; return HT_OK; } } } } /* Character handling ** ------------------ */ PRIVATE int HTBufferWriter_put_character (HTOutputStream * me, char c) { return HTBufferWriter_write(me, &c, 1); } /* String handling ** --------------- ** ** Strings must be smaller than this buffer size. */ PRIVATE int HTBufferWriter_put_string (HTOutputStream * me, const char * s) { return HTBufferWriter_write(me, s, (int) strlen(s)); } /* ** The difference between the close and the free method is that we don't ** close the connection in the free method - we only call the free method ** of the target stream. That way, we can keep the output stream as long ** as the channel itself. */ PRIVATE int HTBufferWriter_close (HTOutputStream * me) { if (me) { if (me->target) (*me->target->isa->close)(me->target); HT_FREE(me->data); HT_FREE(me); } return HT_OK; } PRIVATE const HTOutputStreamClass HTBufferWriter = { "BufferedSocketWriter", HTBufferWriter_lazyFlush, HTBufferWriter_free, HTBufferWriter_abort, HTBufferWriter_put_character, HTBufferWriter_put_string, HTBufferWriter_write, HTBufferWriter_close }; PUBLIC HTOutputStream * HTBufferWriter_new (HTHost * host, HTChannel * ch, void * param, int bufsize) { if (host && ch) { HTOutputStream * me = HTChannel_output(ch); if (!me) { HTOutputStream * me; int tcpbufsize = 0; #if defined(HAVE_GETSOCKOPT) && defined(SO_SNDBUF) /* ** Get the TCP socket buffer size */ { SOCKET sockfd = HTChannel_socket(ch); int size = sizeof(int); int status = getsockopt(sockfd, SOL_SOCKET, SO_SNDBUF, (void *) &tcpbufsize, &size); if (status == -1) { if (STREAM_TRACE) HTTrace("Socket...... Could not get TCP send buffer size for socket %d\n", sockfd); } else { if (STREAM_TRACE) HTTrace("Socket...... TCP send buffer size is %d for socket %d\n", tcpbufsize, sockfd); } } #endif if (bufsize <= 0) bufsize = tcpbufsize ? tcpbufsize : OUTPUT_BUFFER_SIZE; if ((me = (HTOutputStream *) HT_CALLOC(1, sizeof(HTOutputStream)))==NULL || (me->data = (char *) HT_MALLOC(bufsize)) == NULL) HT_OUTOFMEM("HTBufferWriter_new"); me->isa = &HTBufferWriter; me->read = me->data; me->allocated = bufsize; me->growby = bufsize; me->expo = 1; me->target = HTWriter_new(host, ch, param, 0); me->host = host; return me; } } return NULL; }