@@ -248,9 +248,15 @@ void BatchedThreadedNnet3CudaPipeline::OpenDecodeHandle(
248
248
task->callback = std::move (callback);
249
249
task->Init (key, wave_data);
250
250
251
- work_pool_->enqueue (THREAD_POOL_LOW_PRIORITY,
252
- &BatchedThreadedNnet3CudaPipeline::ComputeOneFeature,
253
- this , task);
251
+ if (config_.gpu_feature_extract ) {
252
+ // Feature extraction done on device
253
+ AddTaskToPendingTaskQueue (task);
254
+ } else {
255
+ // Feature extraction done on host thread
256
+ work_pool_->enqueue (THREAD_POOL_LOW_PRIORITY,
257
+ &BatchedThreadedNnet3CudaPipeline::ComputeOneFeatureCPU,
258
+ this , task);
259
+ }
254
260
}
255
261
256
262
void BatchedThreadedNnet3CudaPipeline::OpenDecodeHandle (
@@ -262,9 +268,15 @@ void BatchedThreadedNnet3CudaPipeline::OpenDecodeHandle(
262
268
task->callback = std::move (callback);
263
269
task->Init (key, wave_data, sample_rate);
264
270
265
- work_pool_->enqueue (THREAD_POOL_LOW_PRIORITY,
266
- &BatchedThreadedNnet3CudaPipeline::ComputeOneFeature,
267
- this , task);
271
+ if (config_.gpu_feature_extract ) {
272
+ // Feature extraction done on device
273
+ AddTaskToPendingTaskQueue (task);
274
+ } else {
275
+ // Feature extraction done on host thread
276
+ work_pool_->enqueue (THREAD_POOL_LOW_PRIORITY,
277
+ &BatchedThreadedNnet3CudaPipeline::ComputeOneFeatureCPU,
278
+ this , task);
279
+ }
268
280
}
269
281
270
282
bool BatchedThreadedNnet3CudaPipeline::GetRawLattice (const std::string &key,
@@ -410,6 +422,8 @@ void BatchedThreadedNnet3CudaPipeline::AquireAdditionalTasks(
410
422
void BatchedThreadedNnet3CudaPipeline::ComputeBatchNnet (
411
423
nnet3::NnetBatchComputer &computer, int32 first,
412
424
std::vector<TaskState *> &tasks) {
425
+ nvtxRangePushA (" ComputeBatchNnet" );
426
+
413
427
bool output_to_cpu = false ;
414
428
int32 online_ivector_period = 0 ;
415
429
int max_pending_minibatches =
@@ -421,17 +435,32 @@ void BatchedThreadedNnet3CudaPipeline::ComputeBatchNnet(
421
435
// for all new batches enqueue up nnet work.
422
436
for (int i = first; i < tasks.size (); i++) {
423
437
TaskState &task = *tasks[i];
424
- Vector<BaseFloat> &ivector_features = task.task_data ->ivector_features ;
425
- Matrix<BaseFloat> &input_features = task.task_data ->input_features ;
438
+ std::shared_ptr<TaskData> &task_data = task.task_data ;
426
439
std::vector<nnet3::NnetInferenceTask> &ntasks = nnet_tasks[i];
440
+
441
+ if (config_.gpu_feature_extract ) {
442
+ CuVector<BaseFloat> &ivector_features = task_data->ivector_features ;
443
+ CuMatrix<BaseFloat> &input_features = task_data->input_features ;
444
+
445
+ CuVector<BaseFloat> *ifeat = NULL ;
446
+ if (ivector_features.Dim () > 0 ) {
447
+ ifeat = &ivector_features;
448
+ }
449
+ // create task list
450
+ computer.SplitUtteranceIntoTasks (output_to_cpu, input_features, ifeat,
451
+ NULL , online_ivector_period, &ntasks);
452
+ } else {
453
+ Vector<BaseFloat> &ivector_features = task_data->ivector_features_cpu ;
454
+ Matrix<BaseFloat> &input_features = task_data->input_features_cpu ;
427
455
428
- Vector<BaseFloat> *ifeat = NULL ;
429
- if (ivector_features.Dim () > 0 ) {
430
- ifeat = &ivector_features;
456
+ Vector<BaseFloat> *ifeat = NULL ;
457
+ if (ivector_features.Dim () > 0 ) {
458
+ ifeat = &ivector_features;
459
+ }
460
+ // create task list
461
+ computer.SplitUtteranceIntoTasks (output_to_cpu, input_features, ifeat,
462
+ NULL , online_ivector_period, &ntasks);
431
463
}
432
- // create task list
433
- computer.SplitUtteranceIntoTasks (output_to_cpu, input_features, ifeat, NULL ,
434
- online_ivector_period, &ntasks);
435
464
436
465
// Add tasks to computer
437
466
for (size_t j = 0 ; j < ntasks.size (); j++) {
@@ -448,33 +477,37 @@ void BatchedThreadedNnet3CudaPipeline::ComputeBatchNnet(
448
477
// Extract Posteriors
449
478
for (int i = first; i < tasks.size (); i++) {
450
479
TaskState &task = *tasks[i];
451
- CuMatrix<BaseFloat> &posteriors = task.task_data ->posteriors ;
480
+ std::shared_ptr<TaskData> &task_data = task.task_data ;
481
+ CuMatrix<BaseFloat> &posteriors = task_data->posteriors ;
452
482
MergeTaskOutput (nnet_tasks[i], &posteriors);
453
483
454
484
// nnet output is no longer necessary as we have copied the output out
455
485
nnet_tasks[i].resize (0 );
456
486
457
487
// featurs are no longer needed so free memory
458
- task. task_data ->ivector_features .Resize (0 );
459
- task. task_data ->input_features .Resize (0 , 0 );
488
+ task_data->ivector_features .Resize (0 );
489
+ task_data->input_features .Resize (0 , 0 );
460
490
}
491
+
492
+ nvtxRangePop ();
461
493
}
462
494
463
495
// Computes Features for a single decode instance.
464
- void BatchedThreadedNnet3CudaPipeline::ComputeOneFeature (TaskState *task_) {
465
- nvtxRangePushA (" ComputeOneFeature " );
496
+ void BatchedThreadedNnet3CudaPipeline::ComputeOneFeatureCPU (TaskState *task_) {
497
+ nvtxRangePushA (" ComputeOneFeatureCPU " );
466
498
TaskState &task = *task_;
467
- Vector<BaseFloat> &ivector_features = task.task_data ->ivector_features ;
468
- Matrix<BaseFloat> &input_features = task.task_data ->input_features ;
499
+ std::shared_ptr<TaskData> &task_data = task.task_data ;
500
+ Vector<BaseFloat> &ivector_features = task_data->ivector_features_cpu ;
501
+ Matrix<BaseFloat> &input_features = task_data->input_features_cpu ;
469
502
470
503
// create decoding state
471
504
OnlineNnet2FeaturePipeline feature (*feature_info_);
472
505
473
506
// Accept waveforms
474
507
feature.AcceptWaveform (
475
- task. task_data ->sample_frequency ,
476
- SubVector<BaseFloat>(*task. task_data ->wave_samples , 0 ,
477
- task. task_data ->wave_samples ->Dim ()));
508
+ task_data->sample_frequency ,
509
+ SubVector<BaseFloat>(*task_data->wave_samples , 0 ,
510
+ task_data->wave_samples ->Dim ()));
478
511
feature.InputFinished ();
479
512
// All frames should be ready here
480
513
int32 numFrames = feature.NumFramesReady ();
@@ -487,7 +520,8 @@ void BatchedThreadedNnet3CudaPipeline::ComputeOneFeature(TaskState *task_) {
487
520
488
521
std::vector<int > frames (numFrames);
489
522
// create list of frames
490
- for (int j = 0 ; j < numFrames; j++) frames[j] = j;
523
+ for (int j = 0 ; j < numFrames; j++)
524
+ frames[j] = j;
491
525
492
526
// Copy Features
493
527
input_features.Resize (numFrames, input_dim);
@@ -501,18 +535,106 @@ void BatchedThreadedNnet3CudaPipeline::ComputeOneFeature(TaskState *task_) {
501
535
// Copy Features
502
536
feature.IvectorFeature ()->GetFrame (numFrames - 1 , &ivector_features);
503
537
}
504
- nvtxRangePop ();
505
538
506
539
AddTaskToPendingTaskQueue (task_);
540
+
541
+ nvtxRangePop ();
507
542
}
508
543
544
+ // Computes features across the tasks[first,tasks.size()
545
+ void BatchedThreadedNnet3CudaPipeline::ComputeBatchFeatures (
546
+ int32 first, std::vector<TaskState *> &tasks,
547
+ OnlineCudaFeaturePipeline &feature_pipeline) {
548
+ KALDI_ASSERT (config_.gpu_feature_extract ==true );
549
+ nvtxRangePushA (" CopyBatchWaves" );
550
+ // below we will pack waves into a single buffer for efficient transfer across device
551
+
552
+ // first count the total number of elements and create a single large vector
553
+ int count=0 ;
554
+ for (int i = first; i < tasks.size (); i++) {
555
+ count+=tasks[i]->task_data ->wave_samples ->Dim ();
556
+ }
557
+
558
+ // creating a thread local vector of pinned memory.
559
+ // wave data will be stagged through this memory to get
560
+ // more efficient non-blocking transfers to the device.
561
+ thread_local Vector<BaseFloat> pinned_vector;
562
+
563
+ if (pinned_vector.Dim () < count ) {
564
+ if ( pinned_vector.Dim ()!=0 ) {
565
+ cudaHostUnregister (pinned_vector.Data ());
566
+ }
567
+ // allocated array 2x size
568
+ pinned_vector.Resize (count*2 ,kUndefined );
569
+ cudaHostRegister (pinned_vector.Data (), pinned_vector.Dim ()*sizeof (BaseFloat),0 );
570
+ }
571
+
572
+ // We will launch a thread for each task in order to get better host memory bandwidth
573
+ std::vector<std::future<void > > futures; // for syncing
574
+
575
+ // vector copy function for threading below.
576
+ auto copy_vec = [](SubVector<BaseFloat> &dst, const SubVector<BaseFloat> &src) {
577
+ nvtxRangePushA (" CopyVec" );
578
+ dst.CopyFromVec (src);
579
+ nvtxRangePop ();
580
+ };
581
+
582
+ // next launch threads to copy all waves for each task in parallel
583
+ count=0 ;
584
+ for (int i = first; i < tasks.size (); i++) {
585
+ std::shared_ptr<TaskData> &task_data = tasks[i]->task_data ;
586
+ SubVector<BaseFloat> wave (pinned_vector,count,task_data->wave_samples ->Dim ());
587
+ count+=task_data->wave_samples ->Dim ();
588
+ futures.push_back (
589
+ work_pool_->enqueue (copy_vec, wave, *(task_data->wave_samples ))
590
+ );
591
+ }
592
+
593
+ // wait for waves to be copied into place
594
+ for (int i = 0 ; i < futures.size (); i++) {
595
+ futures[i].get ();
596
+ }
597
+
598
+ CuVector<BaseFloat> cu_waves (count, kUndefined );
599
+ // copy memory down asynchronously. Vector copy functions are synchronous so we do it manually.
600
+ // It is important for this to happen asynchrously to help hide launch latency of smaller kernels
601
+ // that come in the future.
602
+ cudaMemcpyAsync (cu_waves.Data (), pinned_vector.Data (), cu_waves.Dim ()*sizeof (BaseFloat),
603
+ cudaMemcpyHostToDevice, cudaStreamPerThread);
604
+ nvtxRangePop ();
605
+
606
+ nvtxRangePushA (" ComputeBatchFeatures" );
607
+ // extract features for each wave
608
+ count=0 ;
609
+ for (int i = first; i < tasks.size (); i++) {
610
+ TaskState &task = *tasks[i];
611
+ std::shared_ptr<TaskData> &task_data = task.task_data ;
612
+
613
+ CuSubVector<BaseFloat> cu_wave (cu_waves,count,task_data->wave_samples ->Dim ());
614
+ count+=task_data->wave_samples ->Dim ();
615
+ feature_pipeline.ComputeFeatures (cu_wave, task_data->sample_frequency ,
616
+ &task_data->input_features , &task_data->ivector_features );
617
+
618
+ int32 numFrames = task_data->input_features .NumRows ();
619
+
620
+ if (numFrames == 0 ) {
621
+ // Make this a warning for now. Need to check how this is handled
622
+ KALDI_WARN << " Warning empty audio file" ;
623
+ }
624
+ }
625
+ nvtxRangePop ();
626
+ }
627
+
628
+
629
+
509
630
// Allocates decodables for tasks in the range of tasks[first,tasks.size())
510
631
void BatchedThreadedNnet3CudaPipeline::AllocateDecodables (
511
632
int32 first, std::vector<TaskState *> &tasks,
512
633
std::vector<CudaDecodableInterface *> &decodables) {
513
634
// Create mapped decodable here
514
635
for (int i = first; i < tasks.size (); i++) {
515
- CuMatrix<BaseFloat> &posteriors = tasks[i]->task_data ->posteriors ;
636
+ std::shared_ptr<TaskData> &task_data = tasks[i]->task_data ;
637
+ CuMatrix<BaseFloat> &posteriors = task_data->posteriors ;
516
638
decodables.push_back (
517
639
new DecodableCuMatrixMapped (*trans_model_, posteriors, 0 ));
518
640
}
@@ -666,6 +788,8 @@ void BatchedThreadedNnet3CudaPipeline::ExecuteWorker(int threadId) {
666
788
nnet3::NnetBatchComputer computer (config_.compute_opts , am_nnet_->GetNnet (),
667
789
am_nnet_->Priors ());
668
790
791
+ OnlineCudaFeaturePipeline feature_pipeline (config_.feature_opts );
792
+
669
793
ChannelState channel_state;
670
794
671
795
std::vector<TaskState *> tasks; // The state for each decode
@@ -713,10 +837,12 @@ void BatchedThreadedNnet3CudaPipeline::ExecuteWorker(int threadId) {
713
837
714
838
// New tasks are now in the in tasks[start,tasks.size())
715
839
if (start != tasks.size ()) { // if there are new tasks
840
+ if (config_.gpu_feature_extract )
841
+ ComputeBatchFeatures (start, tasks, feature_pipeline);
716
842
ComputeBatchNnet (computer, start, tasks);
717
843
AllocateDecodables (start, tasks, decodables);
718
844
}
719
- } // end if(tasks_front_!=tasks_back_)
845
+ } // end if (tasks_front_!=tasks_back_)
720
846
721
847
// check if there is no active work on this thread.
722
848
// This can happen if another thread was assigned the work.
0 commit comments