//------------------------------------------------------------------------------
// File: OutputQ.cpp
//
// Desc: DirectShow base classes - implements COutputQueue class used by an
//       output pin which may sometimes want to queue output samples on a
//       separate thread and sometimes call Receive() directly on the input
//       pin.
//
// Copyright (c) 1992-2001 Microsoft Corporation.  All rights reserved.
//------------------------------------------------------------------------------
 
 
#include "streams.h"
 
 
//
//  COutputQueue Constructor :
//
//  Determines if a thread is to be created and creates resources
//
//     pInputPin  - the downstream input pin we're queueing samples to
//
//     phr        - changed to a failure code if this function fails
//                  (otherwise unchanges)
//
//     bAuto      - Ask pInputPin if it can block in Receive by calling
//                  its ReceiveCanBlock method and create a thread if
//                  it can block, otherwise not.
//
//     bQueue     - if bAuto == FALSE then we create a thread if and only
//                  if bQueue == TRUE
//
//     lBatchSize - work in batches of lBatchSize
//
//     bBatchEact - Use exact batch sizes so don't send until the
//                  batch is full or SendAnyway() is called
//
//     lListSize  - If we create a thread make the list of samples queued
//                  to the thread have this size cache
//
//     dwPriority - If we create a thread set its priority to this
//
COutputQueue::COutputQueue(
             IPin         *pInputPin,          //  Pin to send stuff to
             __inout HRESULT      *phr,        //  'Return code'
             BOOL          bAuto,              //  Ask pin if queue or not
             BOOL          bQueue,             //  Send through queue
             LONG          lBatchSize,         //  Batch
             BOOL          bBatchExact,        //  Batch exactly to BatchSize
             LONG          lListSize,
             DWORD         dwPriority,
             bool          bFlushingOpt        // flushing optimization
            ) : m_lBatchSize(lBatchSize),
                m_bBatchExact(bBatchExact && (lBatchSize > 1)),
                m_hThread(NULL),
                m_hSem(NULL),
                m_List(NULL),
                m_pPin(pInputPin),
                m_ppSamples(NULL),
                m_lWaiting(0),
                m_evFlushComplete(FALSE, phr),
                m_pInputPin(NULL),
                m_bSendAnyway(FALSE),
                m_nBatched(0),
                m_bFlushing(FALSE),
                m_bFlushed(TRUE),
                m_bFlushingOpt(bFlushingOpt),
                m_bTerminate(FALSE),
                m_hEventPop(NULL),
                m_hr(S_OK)
{
    ASSERT(m_lBatchSize > 0);
 
 
    if (FAILED(*phr)) {
        return;
    }
 
    //  Check the input pin is OK and cache its IMemInputPin interface
 
    *phr = pInputPin->QueryInterface(IID_IMemInputPin, (void **)&m_pInputPin);
    if (FAILED(*phr)) {
        return;
    }
 
    // See if we should ask the downstream pin
 
    if (bAuto) {
        HRESULT hr = m_pInputPin->ReceiveCanBlock();
        if (SUCCEEDED(hr)) {
            bQueue = hr == S_OK;
        }
    }
 
    //  Create our sample batch
 
    m_ppSamples = new PMEDIASAMPLE[m_lBatchSize];
    if (m_ppSamples == NULL) {
        *phr = E_OUTOFMEMORY;
        return;
    }
 
    //  If we're queueing allocate resources
 
    if (bQueue) {
        DbgLog((LOG_TRACE, 2, TEXT("Creating thread for output pin")));
        m_hSem = CreateSemaphore(NULL, 0, 0x7FFFFFFF, NULL);
        if (m_hSem == NULL) {
            DWORD dwError = GetLastError();
            *phr = AmHresultFromWin32(dwError);
            return;
        }
        m_List = new CSampleList(NAME("Sample Queue List"),
                                 lListSize,
                                 FALSE         // No lock
                                );
        if (m_List == NULL) {
            *phr = E_OUTOFMEMORY;
            return;
        }
 
 
        DWORD dwThreadId;
        m_hThread = CreateThread(NULL,
                                 0,
                                 InitialThreadProc,
                                 (LPVOID)this,
                                 0,
                                 &dwThreadId);
        if (m_hThread == NULL) {
            DWORD dwError = GetLastError();
            *phr = AmHresultFromWin32(dwError);
            return;
        }
        SetThreadPriority(m_hThread, dwPriority);
    } else {
        DbgLog((LOG_TRACE, 2, TEXT("Calling input pin directly - no thread")));
    }
}
 
//
//  COutputQueuee Destructor :
//
//  Free all resources -
//
//      Thread,
//      Batched samples
//
COutputQueue::~COutputQueue()
{
    DbgLog((LOG_TRACE, 3, TEXT("COutputQueue::~COutputQueue")));
    /*  Free our pointer */
    if (m_pInputPin != NULL) {
        m_pInputPin->Release();
    }
    if (m_hThread != NULL) {
        {
            CAutoLock lck(this);
            m_bTerminate = TRUE;
            m_hr = S_FALSE;
            NotifyThread();
        }
        DbgWaitForSingleObject(m_hThread);
        EXECUTE_ASSERT(CloseHandle(m_hThread));
 
        //  The thread frees the samples when asked to terminate
 
        ASSERT(m_List->GetCount() == 0);
        delete m_List;
    } else {
        FreeSamples();
    }
    if (m_hSem != NULL) {
        EXECUTE_ASSERT(CloseHandle(m_hSem));
    }
    delete [] m_ppSamples;
}
 
//
//  Call the real thread proc as a member function
//
DWORD WINAPI COutputQueue::InitialThreadProc(__in LPVOID pv)
{
    HRESULT hrCoInit = CAMThread::CoInitializeHelper();
    
    COutputQueue *pSampleQueue = (COutputQueue *)pv;
    DWORD dwReturn = pSampleQueue->ThreadProc();
 
    if(hrCoInit == S_OK) {
        CoUninitialize();
    }
    
    return dwReturn;
}
 
//
//  Thread sending the samples downstream :
//
//  When there is nothing to do the thread sets m_lWaiting (while
//  holding the critical section) and then waits for m_hSem to be
//  set (not holding the critical section)
//
DWORD COutputQueue::ThreadProc()
{
    for (;;) {
        BOOL          bWait = FALSE;
        IMediaSample *pSample;
        LONG          lNumberToSend = 0; // Local copy
        NewSegmentPacket* ppacket = NULL;
 
        //
        //  Get a batch of samples and send it if possible
        //  In any case exit the loop if there is a control action
        //  requested
        //
        {
            CAutoLock lck(this);
            for (;;) {
 
                if (m_bTerminate) {
                    FreeSamples();
                    return 0;
                }
                if (m_bFlushing) {
                    FreeSamples();
                    SetEvent(m_evFlushComplete);
                }
 
                //  Get a sample off the list
 
                pSample = m_List->RemoveHead();
		// inform derived class we took something off the queue
		if (m_hEventPop) {
                    //DbgLog((LOG_TRACE,3,TEXT("Queue: Delivered  SET EVENT")));
		    SetEvent(m_hEventPop);
		}
 
                if (pSample != NULL &&
                    !IsSpecialSample(pSample)) {
 
                    //  If its just a regular sample just add it to the batch
                    //  and exit the loop if the batch is full
 
                    m_ppSamples[m_nBatched++] = pSample;
                    if (m_nBatched == m_lBatchSize) {
                        break;
                    }
                } else {
 
                    //  If there was nothing in the queue and there's nothing
                    //  to send (either because there's nothing or the batch
                    //  isn't full) then prepare to wait
 
                    if (pSample == NULL &&
                        (m_bBatchExact || m_nBatched == 0)) {
 
                        //  Tell other thread to set the event when there's
                        //  something do to
 
                        ASSERT(m_lWaiting == 0);
                        m_lWaiting++;
                        bWait      = TRUE;
                    } else {
 
                        //  We break out of the loop on SEND_PACKET unless
                        //  there's nothing to send
 
                        if (pSample == SEND_PACKET && m_nBatched == 0) {
                            continue;
                        }
 
                        if (pSample == NEW_SEGMENT) {
                            // now we need the parameters - we are
                            // guaranteed that the next packet contains them
                            ppacket = (NewSegmentPacket *) m_List->RemoveHead();
			    // we took something off the queue
			    if (m_hEventPop) {
                    	        //DbgLog((LOG_TRACE,3,TEXT("Queue: Delivered  SET EVENT")));
		    	        SetEvent(m_hEventPop);
			    }
 
                            ASSERT(ppacket);
                        }
                        //  EOS_PACKET falls through here and we exit the loop
                        //  In this way it acts like SEND_PACKET
                    }
                    break;
                }
            }
            if (!bWait) {
                // We look at m_nBatched from the client side so keep
                // it up to date inside the critical section
                lNumberToSend = m_nBatched;  // Local copy
                m_nBatched = 0;
            }
        }
 
        //  Wait for some more data
 
        if (bWait) {
            DbgWaitForSingleObject(m_hSem);
            continue;
        }
 
 
 
        //  OK - send it if there's anything to send
        //  We DON'T check m_bBatchExact here because either we've got
        //  a full batch or we dropped through because we got
        //  SEND_PACKET or EOS_PACKET - both of which imply we should
        //  flush our batch
 
        if (lNumberToSend != 0) {
            long nProcessed;
            if (m_hr == S_OK) {
                ASSERT(!m_bFlushed);
                HRESULT hr = m_pInputPin->ReceiveMultiple(m_ppSamples,
                                                          lNumberToSend,
                                                          &nProcessed);
                /*  Don't overwrite a flushing state HRESULT */
                CAutoLock lck(this);
                if (m_hr == S_OK) {
                    m_hr = hr;
                }
                ASSERT(!m_bFlushed);
            }
            while (lNumberToSend != 0) {
                m_ppSamples[--lNumberToSend]->Release();
            }
            if (m_hr != S_OK) {
 
                //  In any case wait for more data - S_OK just
                //  means there wasn't an error
 
                DbgLog((LOG_ERROR, 2, TEXT("ReceiveMultiple returned %8.8X"),
                       m_hr));
            }
        }
 
        //  Check for end of stream
 
        if (pSample == EOS_PACKET) {
 
            //  We don't send even end of stream on if we've previously
            //  returned something other than S_OK
            //  This is because in that case the pin which returned
            //  something other than S_OK should have either sent
            //  EndOfStream() or notified the filter graph
 
            if (m_hr == S_OK) {
                DbgLog((LOG_TRACE, 2, TEXT("COutputQueue sending EndOfStream()")));
                HRESULT hr = m_pPin->EndOfStream();
                if (FAILED(hr)) {
                    DbgLog((LOG_ERROR, 2, TEXT("COutputQueue got code 0x%8.8X from EndOfStream()")));
                }
            }
        }
 
        //  Data from a new source
 
        if (pSample == RESET_PACKET) {
            m_hr = S_OK;
            SetEvent(m_evFlushComplete);
        }
 
        if (pSample == NEW_SEGMENT) {
            m_pPin->NewSegment(ppacket->tStart, ppacket->tStop, ppacket->dRate);
            delete ppacket;
        }
    }
}
 
//  Send batched stuff anyway
void COutputQueue::SendAnyway()
{
    if (!IsQueued()) {
 
        //  m_bSendAnyway is a private parameter checked in ReceiveMultiple
 
        m_bSendAnyway = TRUE;
        LONG nProcessed;
        ReceiveMultiple(NULL, 0, &nProcessed);
        m_bSendAnyway = FALSE;
 
    } else {
        CAutoLock lck(this);
        QueueSample(SEND_PACKET);
        NotifyThread();
    }
}
 
void
COutputQueue::NewSegment(
    REFERENCE_TIME tStart,
    REFERENCE_TIME tStop,
    double dRate)
{
    if (!IsQueued()) {
        if (S_OK == m_hr) {
            if (m_bBatchExact) {
                SendAnyway();
            }
            m_pPin->NewSegment(tStart, tStop, dRate);
        }
    } else {
        if (m_hr == S_OK) {
            //
            // we need to queue the new segment to appear in order in the
            // data, but we need to pass parameters to it. Rather than
            // take the hit of wrapping every single sample so we can tell
            // special ones apart, we queue special pointers to indicate
            // special packets, and we guarantee (by holding the
            // critical section) that the packet immediately following a
            // NEW_SEGMENT value is a NewSegmentPacket containing the
            // parameters.
            NewSegmentPacket * ppack = new NewSegmentPacket;
            if (ppack == NULL) {
                return;
            }
            ppack->tStart = tStart;
            ppack->tStop = tStop;
            ppack->dRate = dRate;
 
            CAutoLock lck(this);
            QueueSample(NEW_SEGMENT);
            QueueSample( (IMediaSample*) ppack);
            NotifyThread();
        }
    }
}
 
 
//
//  End of Stream is queued to output device
//
void COutputQueue::EOS()
{
    CAutoLock lck(this);
    if (!IsQueued()) {
        if (m_bBatchExact) {
            SendAnyway();
        }
        if (m_hr == S_OK) {
            DbgLog((LOG_TRACE, 2, TEXT("COutputQueue sending EndOfStream()")));
            m_bFlushed = FALSE;
            HRESULT hr = m_pPin->EndOfStream();
            if (FAILED(hr)) {
                DbgLog((LOG_ERROR, 2, TEXT("COutputQueue got code 0x%8.8X from EndOfStream()")));
            }
        }
    } else {
        if (m_hr == S_OK) {
            m_bFlushed = FALSE;
            QueueSample(EOS_PACKET);
            NotifyThread();
        }
    }
}
 
//
//  Flush all the samples in the queue
//
void COutputQueue::BeginFlush()
{
    if (IsQueued()) {
        {
            CAutoLock lck(this);
 
            // block receives -- we assume this is done by the
            // filter in which we are a component
 
            // discard all queued data
 
            m_bFlushing = TRUE;
 
            //  Make sure we discard all samples from now on
 
            if (m_hr == S_OK) {
                m_hr = S_FALSE;
            }
 
            // Optimize so we don't keep calling downstream all the time
 
            if (m_bFlushed && m_bFlushingOpt) {
                return;
            }
 
            // Make sure we really wait for the flush to complete
            m_evFlushComplete.Reset();
 
            NotifyThread();
        }
 
        // pass this downstream
 
        m_pPin->BeginFlush();
    } else {
        // pass downstream first to avoid deadlocks
        m_pPin->BeginFlush();
        CAutoLock lck(this);
        // discard all queued data
 
        m_bFlushing = TRUE;
 
        //  Make sure we discard all samples from now on
 
        if (m_hr == S_OK) {
            m_hr = S_FALSE;
        }
    }
 
}
 
//
// leave flush mode - pass this downstream
void COutputQueue::EndFlush()
{
    {
        CAutoLock lck(this);
        ASSERT(m_bFlushing);
        if (m_bFlushingOpt && m_bFlushed && IsQueued()) {
            m_bFlushing = FALSE;
            m_hr = S_OK;
            return;
        }
    }
 
    // sync with pushing thread -- done in BeginFlush
    // ensure no more data to go downstream -- done in BeginFlush
    //
    // Because we are synching here there is no need to hold the critical
    // section (in fact we'd deadlock if we did!)
 
    if (IsQueued()) {
        m_evFlushComplete.Wait();
    } else {
        FreeSamples();
    }
 
    //  Be daring - the caller has guaranteed no samples will arrive
    //  before EndFlush() returns
 
    m_bFlushing = FALSE;
    m_bFlushed  = TRUE;
 
    // call EndFlush on downstream pins
 
    m_pPin->EndFlush();
 
    m_hr = S_OK;
}
 
//  COutputQueue::QueueSample
//
//  private method to Send a sample to the output queue
//  The critical section MUST be held when this is called
 
void COutputQueue::QueueSample(IMediaSample *pSample)
{
    if (NULL == m_List->AddTail(pSample)) {
        if (!IsSpecialSample(pSample)) {
            pSample->Release();
        }
    }
}
 
//
//  COutputQueue::Receive()
//
//  Send a single sample by the multiple sample route
//  (NOTE - this could be optimized if necessary)
//
//  On return the sample will have been Release()'d
//
 
HRESULT COutputQueue::Receive(IMediaSample *pSample)
{
    LONG nProcessed;
    return ReceiveMultiple(&pSample, 1, &nProcessed);
}
 
//
//  COutputQueue::ReceiveMultiple()
//
//  Send a set of samples to the downstream pin
//
//      ppSamples           - array of samples
//      nSamples            - how many
//      nSamplesProcessed   - How many were processed
//
//  On return all samples will have been Release()'d
//
 
HRESULT COutputQueue::ReceiveMultiple (
    __in_ecount(nSamples) IMediaSample **ppSamples,
    long nSamples,
    __out long *nSamplesProcessed)
{
    if (nSamples < 0) {
        return E_INVALIDARG;
    }
    
    CAutoLock lck(this);
    //  Either call directly or queue up the samples
 
    if (!IsQueued()) {
 
        //  If we already had a bad return code then just return
 
        if (S_OK != m_hr) {
 
            //  If we've never received anything since the last Flush()
            //  and the sticky return code is not S_OK we must be
            //  flushing
            //  ((!A || B) is equivalent to A implies B)
            ASSERT(!m_bFlushed || m_bFlushing);
 
            //  We're supposed to Release() them anyway!
            *nSamplesProcessed = 0;
            for (int i = 0; i < nSamples; i++) {
                DbgLog((LOG_TRACE, 3, TEXT("COutputQueue (direct) : Discarding %d samples code 0x%8.8X"),
                        nSamples, m_hr));
                ppSamples[i]->Release();
            }
 
            return m_hr;
        }
        //
        //  If we're flushing the sticky return code should be S_FALSE
        //
        ASSERT(!m_bFlushing);
        m_bFlushed = FALSE;
 
        ASSERT(m_nBatched < m_lBatchSize);
        ASSERT(m_nBatched == 0 || m_bBatchExact);
 
        //  Loop processing the samples in batches
 
        LONG iLost = 0;
        long iDone = 0;
        for (iDone = 0;
             iDone < nSamples || (m_nBatched != 0 && m_bSendAnyway);
            ) {
 
//pragma message (REMIND("Implement threshold scheme"))
            ASSERT(m_nBatched < m_lBatchSize);
            if (iDone < nSamples) {
                m_ppSamples[m_nBatched++] = ppSamples[iDone++];
            }
            if (m_nBatched == m_lBatchSize ||
                nSamples == 0 && (m_bSendAnyway || !m_bBatchExact)) {
                LONG nDone;
                DbgLog((LOG_TRACE, 4, TEXT("Batching %d samples"),
                       m_nBatched));
 
                if (m_hr == S_OK) {
                    m_hr = m_pInputPin->ReceiveMultiple(m_ppSamples,
                                                        m_nBatched,
                                                        &nDone);
                } else {
                    nDone = 0;
                }
                iLost += m_nBatched - nDone;
                for (LONG i = 0; i < m_nBatched; i++) {
                    m_ppSamples[i]->Release();
                }
                m_nBatched = 0;
            }
        }
        *nSamplesProcessed = iDone - iLost;
        if (*nSamplesProcessed < 0) {
            *nSamplesProcessed = 0;
        }
        return m_hr;
    } else {
        /*  We're sending to our thread */
 
        if (m_hr != S_OK) {
            *nSamplesProcessed = 0;
            DbgLog((LOG_TRACE, 3, TEXT("COutputQueue (queued) : Discarding %d samples code 0x%8.8X"),
                    nSamples, m_hr));
            for (int i = 0; i < nSamples; i++) {
                ppSamples[i]->Release();
            }
            return m_hr;
        }
        m_bFlushed = FALSE;
        for (long i = 0; i < nSamples; i++) {
            QueueSample(ppSamples[i]);
        }
        *nSamplesProcessed = nSamples;
        if (!m_bBatchExact ||
            m_nBatched + m_List->GetCount() >= m_lBatchSize) {
            NotifyThread();
        }
        return S_OK;
    }
}
 
//  Get ready for new data - cancels sticky m_hr
void COutputQueue::Reset()
{
    if (!IsQueued()) {
        m_hr = S_OK;
    } else {
        {
            CAutoLock lck(this);
            QueueSample(RESET_PACKET);
            NotifyThread();
        }
        m_evFlushComplete.Wait();
    }
}
 
//  Remove and Release() all queued and Batched samples
void COutputQueue::FreeSamples()
{
    CAutoLock lck(this);
    if (IsQueued()) {
        for (;;) {
            IMediaSample *pSample = m_List->RemoveHead();
	    // inform derived class we took something off the queue
	    if (m_hEventPop) {
                //DbgLog((LOG_TRACE,3,TEXT("Queue: Delivered  SET EVENT")));
	        SetEvent(m_hEventPop);
	    }
 
            if (pSample == NULL) {
                break;
            }
            if (!IsSpecialSample(pSample)) {
                pSample->Release();
            } else {
                if (pSample == NEW_SEGMENT) {
                    //  Free NEW_SEGMENT packet
                    NewSegmentPacket *ppacket =
                        (NewSegmentPacket *) m_List->RemoveHead();
		    // inform derived class we took something off the queue
		    if (m_hEventPop) {
                        //DbgLog((LOG_TRACE,3,TEXT("Queue: Delivered  SET EVENT")));
		        SetEvent(m_hEventPop);
		    }
 
                    ASSERT(ppacket != NULL);
                    delete ppacket;
                }
            }
        }
    }
    for (int i = 0; i < m_nBatched; i++) {
        m_ppSamples[i]->Release();
    }
    m_nBatched = 0;
}
 
//  Notify the thread if there is something to do
//
//  The critical section MUST be held when this is called
void COutputQueue::NotifyThread()
{
    //  Optimize - no need to signal if it's not waiting
    ASSERT(IsQueued());
    if (m_lWaiting) {
        ReleaseSemaphore(m_hSem, m_lWaiting, NULL);
        m_lWaiting = 0;
    }
}
 
//  See if there's any work to do
//  Returns
//      TRUE  if there is nothing on the queue and nothing in the batch
//            and all data has been sent
//      FALSE otherwise
//
BOOL COutputQueue::IsIdle()
{
    CAutoLock lck(this);
 
    //  We're idle if
    //      there is no thread (!IsQueued()) OR
    //      the thread is waiting for more work  (m_lWaiting != 0)
    //  AND
    //      there's nothing in the current batch (m_nBatched == 0)
 
    if (IsQueued() && m_lWaiting == 0 || m_nBatched != 0) {
        return FALSE;
    } else {
 
        //  If we're idle it shouldn't be possible for there
        //  to be anything on the work queue
 
        ASSERT(!IsQueued() || m_List->GetCount() == 0);
        return TRUE;
    }
}
 
 
void COutputQueue::SetPopEvent(HANDLE hEvent)
{
    m_hEventPop = hEvent;
}

V648 Priority of the '&&' operation is higher than that of the '||' operation.

V668 There is no sense in testing the 'm_ppSamples' pointer against null, as the memory was allocated using the 'new' operator. The exception will be generated in the case of memory allocation error.

V668 There is no sense in testing the 'm_List' pointer against null, as the memory was allocated using the 'new' operator. The exception will be generated in the case of memory allocation error.

V668 There is no sense in testing the 'ppack' pointer against null, as the memory was allocated using the 'new' operator. The exception will be generated in the case of memory allocation error.

V1027 Pointer to an object of the 'NewSegmentPacket' class is cast to unrelated 'IMediaSample' class.

V513 Use _beginthreadex/_endthreadex functions instead of CreateThread/ExitThread functions.

V522 There might be dereferencing of a potential null pointer 'ppacket'.