@@ -61,6 +61,7 @@ internal sealed partial class Win32FileStream : FileStreamBase
6161 private long _appendStart ; // When appending, prevent overwriting file.
6262
6363 private Task < int > _lastSynchronouslyCompletedTask = null ;
64+ private Task _activeBufferOperation = null ;
6465
6566 [ System . Security . SecuritySafeCritical ]
6667 public Win32FileStream ( String path , FileMode mode , FileAccess access , FileShare share , int bufferSize , FileOptions options , FileStream parent ) : base ( parent )
@@ -331,6 +332,10 @@ private unsafe void VerifyHandleIsSync()
331332 }
332333 }
333334
335+ private bool HasActiveBufferOperation
336+ {
337+ get { return _activeBufferOperation != null && ! _activeBufferOperation . IsCompleted ; }
338+ }
334339
335340 public override bool CanRead
336341 {
@@ -520,6 +525,26 @@ private void FlushRead()
520525 _readLen = 0 ;
521526 }
522527
528+ // Returns a task that flushes the internal write buffer
529+ private Task FlushWriteAsync ( CancellationToken cancellationToken )
530+ {
531+ Debug . Assert ( _isAsync ) ;
532+ Debug . Assert ( _readPos == 0 && _readLen == 0 , "FileStream: Read buffer must be empty in FlushWriteAsync!" ) ;
533+
534+ // If the buffer is already flushed, don't spin up the OS write
535+ if ( _writePos == 0 ) return Task . CompletedTask ;
536+
537+ Task flushTask = WriteInternalCoreAsync ( _buffer , 0 , _writePos , cancellationToken ) ;
538+ _writePos = 0 ;
539+
540+ // Update the active buffer operation
541+ _activeBufferOperation = HasActiveBufferOperation ?
542+ Task . WhenAll ( _activeBufferOperation , flushTask ) :
543+ flushTask ;
544+
545+ return flushTask ;
546+ }
547+
523548 // Writes are buffered. Anytime the buffer fills up
524549 // (_writePos + delta > _bufferSize) or the buffer switches to reading
525550 // and there is left over data (_writePos > 0), this function must be called.
@@ -529,7 +554,7 @@ private void FlushWrite(bool calledFromFinalizer)
529554
530555 if ( _isAsync )
531556 {
532- Task writeTask = WriteInternalCoreAsync ( _buffer , 0 , _writePos , CancellationToken . None ) ;
557+ Task writeTask = FlushWriteAsync ( CancellationToken . None ) ;
533558 // With our Whidbey async IO & overlapped support for AD unloads,
534559 // we don't strictly need to block here to release resources
535560 // since that support takes care of the pinning & freeing the
@@ -1259,52 +1284,107 @@ private Task WriteInternalAsync(byte[] array, int offset, int numBytes, Cancella
12591284 if ( ! _parent . CanWrite ) throw __Error . GetWriteNotSupported ( ) ;
12601285
12611286 Debug . Assert ( ( _readPos == 0 && _readLen == 0 && _writePos >= 0 ) || ( _writePos == 0 && _readPos <= _readLen ) , "We're either reading or writing, but not both." ) ;
1287+ Debug . Assert ( ! _isPipe || ( _readPos == 0 && _readLen == 0 ) , "Win32FileStream must not have buffered data here! Pipes should be unidirectional." ) ;
12621288
1263- if ( _isPipe )
1289+ bool writeDataStoredInBuffer = false ;
1290+ if ( ! _isPipe ) // avoid async buffering with pipes, as doing so can lead to deadlocks (see comments in ReadInternalAsyncCore)
12641291 {
1265- // Pipes are tricky, at least when you have 2 different pipes
1266- // that you want to use simultaneously. When redirecting stdout
1267- // & stderr with the Process class, it's easy to deadlock your
1268- // parent & child processes when doing writes 4K at a time. The
1269- // OS appears to use a 4K buffer internally. If you write to a
1270- // pipe that is full, you will block until someone read from
1271- // that pipe. If you try reading from an empty pipe and
1272- // Win32FileStream's ReadAsync blocks waiting for data to fill it's
1273- // internal buffer, you will be blocked. In a case where a child
1274- // process writes to stdout & stderr while a parent process tries
1275- // reading from both, you can easily get into a deadlock here.
1276- // To avoid this deadlock, don't buffer when doing async IO on
1277- // pipes.
1278- Debug . Assert ( _readPos == 0 && _readLen == 0 , "Win32FileStream must not have buffered data here! Pipes should be unidirectional." ) ;
1292+ // Ensure the buffer is clear for writing
1293+ if ( _writePos == 0 )
1294+ {
1295+ if ( _readPos < _readLen )
1296+ {
1297+ FlushRead ( ) ;
1298+ }
1299+ _readPos = 0 ;
1300+ _readLen = 0 ;
1301+ }
12791302
1280- if ( _writePos > 0 )
1281- FlushWrite ( false ) ;
1303+ // Determine how much space remains in the buffer
1304+ int remainingBuffer = _bufferSize - _writePos ;
1305+ Debug . Assert ( remainingBuffer >= 0 ) ;
12821306
1283- return WriteInternalCoreAsync ( array , offset , numBytes , cancellationToken ) ;
1284- }
1307+ // Simple/common case:
1308+ // - The write is smaller than our buffer, such that it's worth considering buffering it.
1309+ // - There's no active flush operation, such that we don't have to worry about the existing buffer being in use.
1310+ // - And the data we're trying to write fits in the buffer, meaning it wasn't already filled by previous writes.
1311+ // In that case, just store it in the buffer.
1312+ if ( numBytes < _bufferSize && ! HasActiveBufferOperation && numBytes <= remainingBuffer )
1313+ {
1314+ if ( _buffer == null )
1315+ _buffer = new byte [ _bufferSize ] ;
12851316
1286- // Handle buffering.
1287- if ( _writePos == 0 )
1288- {
1289- if ( _readPos < _readLen ) FlushRead ( ) ;
1290- _readPos = 0 ;
1291- _readLen = 0 ;
1317+ Buffer . BlockCopy ( array , offset , _buffer , _writePos , numBytes ) ;
1318+ _writePos += numBytes ;
1319+ writeDataStoredInBuffer = true ;
1320+
1321+ // There is one special-but-common case, common because devs often use
1322+ // byte[] sizes that are powers of 2 and thus fit nicely into our buffer, which is
1323+ // also a power of 2. If after our write the buffer still has remaining space,
1324+ // then we're done and can return a completed task now. But if we filled the buffer
1325+ // completely, we want to do the asynchronous flush/write as part of this operation
1326+ // rather than waiting until the next write that fills the buffer.
1327+ if ( numBytes != remainingBuffer )
1328+ return Task . CompletedTask ;
1329+
1330+ Debug . Assert ( _writePos == _bufferSize ) ;
1331+ }
12921332 }
12931333
1294- int n = _bufferSize - _writePos ;
1295- if ( numBytes <= n )
1334+ // At this point, at least one of the following is true:
1335+ // 1. There was an active flush operation (it could have completed by now, though).
1336+ // 2. The data doesn't fit in the remaining buffer (or it's a pipe and we chose not to try).
1337+ // 3. We wrote all of the data to the buffer, filling it.
1338+ //
1339+ // If there's an active operation, we can't touch the current buffer because it's in use.
1340+ // That gives us a choice: we can either allocate a new buffer, or we can skip the buffer
1341+ // entirely (even if the data would otherwise fit in it). For now, for simplicity, we do
1342+ // the latter; it could also have performance wins due to OS-level optimizations, and we could
1343+ // potentially add support for PreAllocatedOverlapped due to having a single buffer. (We can
1344+ // switch to allocating a new buffer, potentially experimenting with buffer pooling, should
1345+ // performance data suggest it's appropriate.)
1346+ //
1347+ // If the data doesn't fit in the remaining buffer, it could be because it's so large
1348+ // it's greater than the entire buffer size, in which case we'd always skip the buffer,
1349+ // or it could be because there's more data than just the space remaining. For the latter
1350+ // case, we need to issue an asynchronous write to flush that data, which then turns this into
1351+ // the first case above with an active operation.
1352+ //
1353+ // If we already stored the data, then we have nothing additional to write beyond what
1354+ // we need to flush.
1355+ //
1356+ // In any of these cases, we have the same outcome:
1357+ // - If there's data in the buffer, flush it by writing it out asynchronously.
1358+ // - Then, if there's any data to be written, issue a write for it concurrently.
1359+ // We return a Task that represents one or both.
1360+
1361+ // Flush the buffer asynchronously if there's anything to flush
1362+ Task flushTask = null ;
1363+ if ( _writePos > 0 )
12961364 {
1297- if ( _writePos == 0 ) _buffer = new byte [ _bufferSize ] ;
1298- Buffer . BlockCopy ( array , offset , _buffer , _writePos , numBytes ) ;
1299- _writePos += numBytes ;
1300-
1301- return Task . CompletedTask ;
1365+ flushTask = FlushWriteAsync ( cancellationToken ) ;
1366+
1367+ // If we already copied all of the data into the buffer,
1368+ // simply return the flush task here. Same goes for if the task has
1369+ // already completed and was unsuccessful.
1370+ if ( writeDataStoredInBuffer ||
1371+ flushTask . IsFaulted ||
1372+ flushTask . IsCanceled )
1373+ {
1374+ return flushTask ;
1375+ }
13021376 }
13031377
1304- if ( _writePos > 0 )
1305- FlushWrite ( false ) ;
1378+ Debug . Assert ( ! writeDataStoredInBuffer ) ;
1379+ Debug . Assert ( _writePos == 0 ) ;
13061380
1307- return WriteInternalCoreAsync ( array , offset , numBytes , cancellationToken ) ;
1381+ // Finally, issue the write asynchronously, and return a Task that logically
1382+ // represents the write operation, including any flushing done.
1383+ Task writeTask = WriteInternalCoreAsync ( array , offset , numBytes , cancellationToken ) ;
1384+ return
1385+ ( flushTask == null || flushTask . Status == TaskStatus . RanToCompletion ) ? writeTask :
1386+ ( writeTask . Status == TaskStatus . RanToCompletion ) ? flushTask :
1387+ Task . WhenAll ( flushTask , writeTask ) ;
13081388 }
13091389
13101390 [ System . Security . SecuritySafeCritical ] // auto-generated
@@ -1395,7 +1475,7 @@ private unsafe Task WriteInternalCoreAsync(byte[] bytes, int offset, int numByte
13951475 throw Win32Marshal . GetExceptionForWin32Error ( errorCode ) ;
13961476 }
13971477 }
1398- else
1478+ else // ERROR_IO_PENDING
13991479 {
14001480 // Only once the IO is pending do we register for cancellation
14011481 completionSource . RegisterForCancellation ( ) ;
0 commit comments