//------------------------------------------------------------------------------
// File: AsyncIo.cpp
//
// Desc: DirectShow sample code - base library with I/O functionality.
//
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------------------------
#include "stdafx.h"
#include "asyncio.h"
// --- CAsyncRequest ---
// implementation of CAsyncRequest representing a single
// outstanding request. All the i/o for this object is done
// in the Complete method.
// init the params for this request.
// Read is not issued until the complete call
HRESULT
CAsyncRequest::Request(
CAsyncIo *pIo,
CAsyncStream *pStream,
LONGLONG llPos,
LONG lLength,
BOOL bAligned,
BYTE* pBuffer,
LPVOID pContext, // filter's context
DWORD_PTR dwUser) // downstream filter's context
{
m_pIo = pIo;
m_pStream = pStream;
m_llPos = llPos;
m_lLength = lLength;
m_bAligned = bAligned;
m_pBuffer = pBuffer;
m_pContext = pContext;
m_dwUser = dwUser;
m_hr = VFW_E_TIMEOUT; // not done yet
return S_OK;
}
// issue the i/o if not overlapped, and block until i/o complete.
// returns error code of file i/o
//
//
HRESULT
CAsyncRequest::Complete()
{
m_pStream->Lock();
m_hr = m_pStream->SetPointer(m_llPos);
if(S_OK == m_hr)
{
DWORD dwActual;
m_hr = m_pStream->Read(m_pBuffer, m_lLength, m_bAligned, &dwActual);
if(m_hr == OLE_S_FIRST)
{
if(m_pContext)
{
IMediaSample *pSample = reinterpret_cast<IMediaSample *>(m_pContext);
pSample->SetDiscontinuity(TRUE);
m_hr = S_OK;
}
}
if(FAILED(m_hr))
{
}
else if(dwActual != (DWORD)m_lLength)
{
// tell caller size changed - probably because of EOF
m_lLength = (LONG) dwActual;
m_hr = S_FALSE;
}
else
{
m_hr = S_OK;
}
}
m_pStream->Unlock();
return m_hr;
}
// --- CAsyncIo ---
// note - all events created manual reset
CAsyncIo::CAsyncIo(CAsyncStream *pStream)
: m_hThread(NULL),
m_evWork(TRUE),
m_evDone(TRUE),
m_evStop(TRUE),
m_listWork(NAME("Work list")),
m_listDone(NAME("Done list")),
m_bFlushing(FALSE),
m_cItemsOut(0),
m_bWaiting(FALSE),
m_pStream(pStream)
{
}
CAsyncIo::~CAsyncIo()
{
// move everything to the done list
BeginFlush();
// shutdown worker thread
CloseThread();
// empty the done list
POSITION pos = m_listDone.GetHeadPosition();
while(pos)
{
CAsyncRequest* pRequest = m_listDone.GetNext(pos);
delete pRequest;
}
m_listDone.RemoveAll();
}
// ready for async activity - call this before calling Request.
//
// start the worker thread if we need to
//
// !!! use overlapped i/o if possible
HRESULT
CAsyncIo::AsyncActive(void)
{
return StartThread();
}
// call this when no more async activity will happen before
// the next AsyncActive call
//
// stop the worker thread if active
HRESULT
CAsyncIo::AsyncInactive(void)
{
return CloseThread();
}
// add a request to the queue.
HRESULT
CAsyncIo::Request(
LONGLONG llPos,
LONG lLength,
BOOL bAligned,
BYTE * pBuffer,
LPVOID pContext,
DWORD_PTR dwUser)
{
if(bAligned)
{
if(!IsAligned(llPos) ||
!IsAligned(lLength) ||
!IsAligned((LONG_PTR) pBuffer))
{
return VFW_E_BADALIGN;
}
}
CAsyncRequest* pRequest = new CAsyncRequest;
if (!pRequest)
return E_OUTOFMEMORY;
HRESULT hr = pRequest->Request(this,
m_pStream,
llPos,
lLength,
bAligned,
pBuffer,
pContext,
dwUser);
if(SUCCEEDED(hr))
{
// might fail if flushing
hr = PutWorkItem(pRequest);
}
if(FAILED(hr))
{
delete pRequest;
}
return hr;
}
// wait for the next request to complete
HRESULT
CAsyncIo::WaitForNext(
DWORD dwTimeout,
LPVOID * ppContext,
DWORD_PTR * pdwUser,
LONG * pcbActual)
{
CheckPointer(ppContext,E_POINTER);
CheckPointer(pdwUser,E_POINTER);
CheckPointer(pcbActual,E_POINTER);
// some errors find a sample, others don't. Ensure that
// *ppContext is NULL if no sample found
*ppContext = NULL;
// wait until the event is set, but since we are not
// holding the critsec when waiting, we may need to re-wait
for(;;)
{
if(!m_evDone.Wait(dwTimeout))
{
// timeout occurred
return VFW_E_TIMEOUT;
}
// get next event from list
CAsyncRequest* pRequest = GetDoneItem();
if(pRequest)
{
// found a completed request
// check if ok
HRESULT hr = pRequest->GetHResult();
if(hr == S_FALSE)
{
// this means the actual length was less than
// requested - may be ok if he aligned the end of file
if((pRequest->GetActualLength() +
pRequest->GetStart()) == Size())
{
hr = S_OK;
}
else
{
// it was an actual read error
hr = E_FAIL;
}
}
// return actual bytes read
*pcbActual = pRequest->GetActualLength();
// return his context
*ppContext = pRequest->GetContext();
*pdwUser = pRequest->GetUser();
delete pRequest;
return hr;
}
else
{
// Hold the critical section while checking the list state
CAutoLock lck(&m_csLists);
if(m_bFlushing && !m_bWaiting)
{
// can't block as we are between BeginFlush and EndFlush
// but note that if m_bWaiting is set, then there are some
// items not yet complete that we should block for.
return VFW_E_WRONG_STATE;
}
}
// done item was grabbed between completion and
// us locking m_csLists.
}
}
// perform a synchronous read request on this thread.
// Need to hold m_csFile while doing this (done in request object)
HRESULT
CAsyncIo::SyncReadAligned(
LONGLONG llPos,
LONG lLength,
BYTE * pBuffer,
LONG * pcbActual,
PVOID pvContext)
{
CheckPointer(pcbActual,E_POINTER);
if(!IsAligned(llPos) ||
!IsAligned(lLength) ||
!IsAligned((LONG_PTR) pBuffer))
{
return VFW_E_BADALIGN;
}
CAsyncRequest request;
HRESULT hr = request.Request(this,
m_pStream,
llPos,
lLength,
TRUE,
pBuffer,
pvContext,
0);
if(FAILED(hr))
return hr;
hr = request.Complete();
// return actual data length
*pcbActual = request.GetActualLength();
return hr;
}
HRESULT
CAsyncIo::Length(LONGLONG *pllTotal, LONGLONG *pllAvailable)
{
CheckPointer(pllTotal,E_POINTER);
*pllTotal = m_pStream->Size(pllAvailable);
return S_OK;
}
// cancel all items on the worklist onto the done list
// and refuse further requests or further WaitForNext calls
// until the end flush
//
// WaitForNext must return with NULL only if there are no successful requests.
// So Flush does the following:
// 1. set m_bFlushing ensures no more requests succeed
// 2. move all items from work list to the done list.
// 3. If there are any outstanding requests, then we need to release the
// critsec to allow them to complete. The m_bWaiting as well as ensuring
// that we are signalled when they are all done is also used to indicate
// to WaitForNext that it should continue to block.
// 4. Once all outstanding requests are complete, we force m_evDone set and
// m_bFlushing set and m_bWaiting false. This ensures that WaitForNext will
// not block when the done list is empty.
HRESULT
CAsyncIo::BeginFlush()
{
// hold the lock while emptying the work list
{
CAutoLock lock(&m_csLists);
// prevent further requests being queued.
// Also WaitForNext will refuse to block if this is set
// unless m_bWaiting is also set which it will be when we release
// the critsec if there are any outstanding).
m_bFlushing = TRUE;
CAsyncRequest * preq;
while((preq = GetWorkItem()) != 0)
{
preq->Cancel();
PutDoneItem(preq);
}
// now wait for any outstanding requests to complete
if(m_cItemsOut > 0)
{
// can be only one person waiting
ASSERT(!m_bWaiting);
// this tells the completion routine that we need to be
// signalled via m_evAllDone when all outstanding items are
// done. It also tells WaitForNext to continue blocking.
m_bWaiting = TRUE;
}
else
{
// all done
// force m_evDone set so that even if list is empty,
// WaitForNext will not block
// don't do this until we are sure that all
// requests are on the done list.
m_evDone.Set();
return S_OK;
}
}
ASSERT(m_bWaiting);
// wait without holding critsec
for(;;)
{
m_evAllDone.Wait();
{
// hold critsec to check
CAutoLock lock(&m_csLists);
if(m_cItemsOut == 0)
{
// now we are sure that all outstanding requests are on
// the done list and no more will be accepted
m_bWaiting = FALSE;
// force m_evDone set so that even if list is empty,
// WaitForNext will not block
// don't do this until we are sure that all
// requests are on the done list.
m_evDone.Set();
return S_OK;
}
}
}
}
// end a flushing state
HRESULT
CAsyncIo::EndFlush()
{
CAutoLock lock(&m_csLists);
m_bFlushing = FALSE;
ASSERT(!m_bWaiting);
// m_evDone might have been set by BeginFlush - ensure it is
// set IFF m_listDone is non-empty
if(m_listDone.GetCount() > 0)
{
m_evDone.Set();
}
else
{
m_evDone.Reset();
}
return S_OK;
}
// start the thread
HRESULT
CAsyncIo::StartThread(void)
{
if(m_hThread)
{
return S_OK;
}
// clear the stop event before starting
m_evStop.Reset();
DWORD dwThreadID;
m_hThread = CreateThread(NULL,
0,
InitialThreadProc,
this,
0,
&dwThreadID);
if(!m_hThread)
{
DWORD dwErr = GetLastError();
return HRESULT_FROM_WIN32(dwErr);
}
return S_OK;
}
// stop the thread and close the handle
HRESULT
CAsyncIo::CloseThread(void)
{
// signal the thread-exit object
m_evStop.Set();
if(m_hThread)
{
WaitForSingleObject(m_hThread, INFINITE);
CloseHandle(m_hThread);
m_hThread = NULL;
}
return S_OK;
}
// manage the list of requests. hold m_csLists and ensure
// that the (manual reset) event hevList is set when things on
// the list but reset when the list is empty.
// returns null if list empty
CAsyncRequest*
CAsyncIo::GetWorkItem()
{
CAutoLock lck(&m_csLists);
CAsyncRequest * preq = m_listWork.RemoveHead();
// force event set correctly
if(m_listWork.GetCount() == 0)
{
m_evWork.Reset();
}
return preq;
}
// get an item from the done list
CAsyncRequest*
CAsyncIo::GetDoneItem()
{
CAutoLock lock(&m_csLists);
CAsyncRequest * preq = m_listDone.RemoveHead();
// force event set correctly if list now empty
// or we're in the final stages of flushing
// Note that during flushing the way it's supposed to work is that
// everything is shoved on the Done list then the application is
// supposed to pull until it gets nothing more
//
// Thus we should not set m_evDone unconditionally until everything
// has moved to the done list which means we must wait until
// cItemsOut is 0 (which is guaranteed by m_bWaiting being TRUE).
if(m_listDone.GetCount() == 0 &&
(!m_bFlushing || m_bWaiting))
{
m_evDone.Reset();
}
return preq;
}
// put an item on the work list - fail if bFlushing
HRESULT
CAsyncIo::PutWorkItem(CAsyncRequest* pRequest)
{
CAutoLock lock(&m_csLists);
HRESULT hr;
if(m_bFlushing)
{
hr = VFW_E_WRONG_STATE;
}
else if(m_listWork.AddTail(pRequest))
{
// event should now be in a set state - force this
m_evWork.Set();
// start the thread now if not already started
hr = StartThread();
}
else
{
hr = E_OUTOFMEMORY;
}
return(hr);
}
// put an item on the done list - ok to do this when
// flushing
HRESULT
CAsyncIo::PutDoneItem(CAsyncRequest* pRequest)
{
ASSERT(CritCheckIn(&m_csLists));
if(m_listDone.AddTail(pRequest))
{
// event should now be in a set state - force this
m_evDone.Set();
return S_OK;
}
else
{
return E_OUTOFMEMORY;
}
}
// called on thread to process any active requests
void
CAsyncIo::ProcessRequests(void)
{
// lock to get the item and increment the outstanding count
CAsyncRequest * preq = NULL;
for(;;)
{
{
CAutoLock lock(&m_csLists);
preq = GetWorkItem();
if(preq == NULL)
{
// done
return;
}
// one more item not on the done or work list
m_cItemsOut++;
// release critsec
}
preq->Complete();
// regain critsec to replace on done list
{
CAutoLock l(&m_csLists);
PutDoneItem(preq);
if(--m_cItemsOut == 0)
{
if(m_bWaiting)
m_evAllDone.Set();
}
}
}
}
// the thread proc - assumes that DWORD thread param is the
// this pointer
DWORD
CAsyncIo::ThreadProc(void)
{
HANDLE ahev[] = {m_evStop, m_evWork};
for(;;)
{
DWORD dw = WaitForMultipleObjects(2,
ahev,
FALSE,
INFINITE);
if(dw == WAIT_OBJECT_0+1)
{
// requests need processing
ProcessRequests();
}
else
{
// any error or stop event - we should exit
return 0;
}
}
}
// perform a synchronous read request on this thread.
// may not be aligned - so we will have to buffer.
HRESULT
CAsyncIo::SyncRead(
LONGLONG llPos,
LONG lLength,
BYTE * pBuffer)
{
if(IsAligned(llPos) &&
IsAligned(lLength) &&
IsAligned((LONG_PTR) pBuffer))
{
LONG cbUnused;
return SyncReadAligned(llPos, lLength, pBuffer, &cbUnused, NULL);
}
// not aligned with requirements - use buffered file handle.
//!!! might want to fix this to buffer the data ourselves?
CAsyncRequest request;
HRESULT hr = request.Request(this,
m_pStream,
llPos,
lLength,
FALSE,
pBuffer,
NULL,
0);
if(FAILED(hr))
{
return hr;
}
return request.Complete();
}
// Return the alignment
HRESULT
CAsyncIo::Alignment(LONG *pAlignment)
{
CheckPointer(pAlignment,E_POINTER);
*pAlignment = Alignment();
return S_OK;
}
↑ V668 There is no sense in testing the 'pRequest' pointer against null, as the memory was allocated using the 'new' operator. The exception will be generated in the case of memory allocation error.
↑ V513 Use _beginthreadex/_endthreadex functions instead of CreateThread/ExitThread functions.