2 Copyright (C) 2012-2019 Carl Hetherington <cth@carlh.net>
4 This file is part of DCP-o-matic.
6 DCP-o-matic is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 2 of the License, or
9 (at your option) any later version.
11 DCP-o-matic is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with DCP-o-matic. If not, see <http://www.gnu.org/licenses/>.
22 #include "compose.hpp"
26 #include "dcpomatic_log.h"
27 #include "dcp_video.h"
28 #include "dcp_content_type.h"
29 #include "audio_mapping.h"
33 #include "audio_buffers.h"
37 #include "reel_writer.h"
38 #include "text_content.h"
40 #include <dcp/locale_convert.h>
41 #include <boost/foreach.hpp>
49 /* OS X strikes again */
61 using boost::shared_ptr;
62 using boost::weak_ptr;
63 using boost::dynamic_pointer_cast;
64 using boost::optional;
66 using namespace dcpomatic;
68 Writer::Writer (shared_ptr<const Film> film, weak_ptr<Job> j)
72 , _queued_full_in_memory (0)
73 /* These will be reset to sensible values when J2KEncoder is created */
74 , _maximum_frames_in_memory (8)
75 , _maximum_queue_size (8)
81 shared_ptr<Job> job = _job.lock ();
82 DCPOMATIC_ASSERT (job);
85 list<DCPTimePeriod> const reels = _film->reels ();
86 BOOST_FOREACH (DCPTimePeriod p, reels) {
87 _reels.push_back (ReelWriter (film, p, job, reel_index++, reels.size(), _film->content_summary(p)));
90 _last_written.resize (reels.size());
92 /* We can keep track of the current audio, subtitle and closed caption reels easily because audio
93 and captions arrive to the Writer in sequence. This is not so for video.
95 _audio_reel = _reels.begin ();
96 _subtitle_reel = _reels.begin ();
97 BOOST_FOREACH (DCPTextTrack i, _film->closed_caption_tracks()) {
98 _caption_reels[i] = _reels.begin ();
101 /* Check that the signer is OK */
103 if (!Config::instance()->signer_chain()->valid(&reason)) {
104 throw InvalidSignerError (reason);
111 _thread = boost::thread (boost::bind(&Writer::thread, this));
112 #ifdef DCPOMATIC_LINUX
113 pthread_setname_np (_thread.native_handle(), "writer");
119 terminate_thread (false);
122 /** Pass a video frame to the writer for writing to disk at some point.
123 * This method can be called with frames out of order.
124 * @param encoded JPEG2000-encoded data.
125 * @param frame Frame index within the DCP.
126 * @param eyes Eyes that this frame image is for.
129 Writer::write (Data encoded, Frame frame, Eyes eyes)
131 boost::mutex::scoped_lock lock (_state_mutex);
133 while (_queued_full_in_memory > _maximum_frames_in_memory) {
134 /* There are too many full frames in memory; wake the main writer thread and
135 wait until it sorts everything out */
136 _empty_condition.notify_all ();
137 _full_condition.wait (lock);
141 qi.type = QueueItem::FULL;
142 qi.encoded = encoded;
143 qi.reel = video_reel (frame);
144 qi.frame = frame - _reels[qi.reel].start ();
146 if (_film->three_d() && eyes == EYES_BOTH) {
147 /* 2D material in a 3D DCP; fake the 3D */
149 _queue.push_back (qi);
150 ++_queued_full_in_memory;
151 qi.eyes = EYES_RIGHT;
152 _queue.push_back (qi);
153 ++_queued_full_in_memory;
156 _queue.push_back (qi);
157 ++_queued_full_in_memory;
160 /* Now there's something to do: wake anything wait()ing on _empty_condition */
161 _empty_condition.notify_all ();
165 Writer::can_repeat (Frame frame) const
167 return frame > _reels[video_reel(frame)].start();
170 /** Repeat the last frame that was written to a reel as a new frame.
171 * @param frame Frame index within the DCP of the new (repeated) frame.
172 * @param eyes Eyes that this repeated frame image is for.
175 Writer::repeat (Frame frame, Eyes eyes)
177 boost::mutex::scoped_lock lock (_state_mutex);
179 while (_queue.size() > _maximum_queue_size && have_sequenced_image_at_queue_head()) {
180 /* The queue is too big, and the main writer thread can run and fix it, so
181 wake it and wait until it has done.
183 _empty_condition.notify_all ();
184 _full_condition.wait (lock);
188 qi.type = QueueItem::REPEAT;
189 qi.reel = video_reel (frame);
190 qi.frame = frame - _reels[qi.reel].start ();
191 if (_film->three_d() && eyes == EYES_BOTH) {
193 _queue.push_back (qi);
194 qi.eyes = EYES_RIGHT;
195 _queue.push_back (qi);
198 _queue.push_back (qi);
201 /* Now there's something to do: wake anything wait()ing on _empty_condition */
202 _empty_condition.notify_all ();
206 Writer::fake_write (Frame frame, Eyes eyes)
208 boost::mutex::scoped_lock lock (_state_mutex);
210 while (_queue.size() > _maximum_queue_size && have_sequenced_image_at_queue_head()) {
211 /* The queue is too big, and the main writer thread can run and fix it, so
212 wake it and wait until it has done.
214 _empty_condition.notify_all ();
215 _full_condition.wait (lock);
218 size_t const reel = video_reel (frame);
219 Frame const frame_in_reel = frame - _reels[reel].start ();
222 qi.type = QueueItem::FAKE;
225 shared_ptr<InfoFileHandle> info_file = _film->info_file_handle(_reels[reel].period(), true);
226 qi.size = _reels[reel].read_frame_info(info_file, frame_in_reel, eyes).size;
230 qi.frame = frame_in_reel;
231 if (_film->three_d() && eyes == EYES_BOTH) {
233 _queue.push_back (qi);
234 qi.eyes = EYES_RIGHT;
235 _queue.push_back (qi);
238 _queue.push_back (qi);
241 /* Now there's something to do: wake anything wait()ing on _empty_condition */
242 _empty_condition.notify_all ();
245 /** Write some audio frames to the DCP.
246 * @param audio Audio data.
247 * @param time Time of this data within the DCP.
248 * This method is not thread safe.
251 Writer::write (shared_ptr<const AudioBuffers> audio, DCPTime const time)
253 DCPOMATIC_ASSERT (audio);
255 int const afr = _film->audio_frame_rate();
257 DCPTime const end = time + DCPTime::from_frames(audio->frames(), afr);
259 /* The audio we get might span a reel boundary, and if so we have to write it in bits */
264 if (_audio_reel == _reels.end ()) {
265 /* This audio is off the end of the last reel; ignore it */
269 if (end <= _audio_reel->period().to) {
270 /* Easy case: we can write all the audio to this reel */
271 _audio_reel->write (audio);
273 } else if (_audio_reel->period().to <= t) {
274 /* This reel is entirely before the start of our audio; just skip the reel */
277 /* This audio is over a reel boundary; split the audio into two and write the first part */
278 DCPTime part_lengths[2] = {
279 _audio_reel->period().to - t,
280 end - _audio_reel->period().to
283 Frame part_frames[2] = {
284 part_lengths[0].frames_ceil(afr),
285 part_lengths[1].frames_ceil(afr)
288 if (part_frames[0]) {
289 shared_ptr<AudioBuffers> part (new AudioBuffers(audio, part_frames[0], 0));
290 _audio_reel->write (part);
293 if (part_frames[1]) {
294 audio.reset (new AudioBuffers(audio, part_frames[1], part_frames[0]));
300 t += part_lengths[0];
306 /** Caller must hold a lock on _state_mutex */
308 Writer::have_sequenced_image_at_queue_head ()
310 if (_queue.empty ()) {
315 QueueItem const & f = _queue.front();
316 return _last_written[f.reel].next(f);
321 Writer::LastWritten::next (QueueItem qi) const
323 if (qi.eyes == EYES_BOTH) {
325 return qi.frame == (_frame + 1);
330 if (_eyes == EYES_LEFT && qi.frame == _frame && qi.eyes == EYES_RIGHT) {
334 if (_eyes == EYES_RIGHT && qi.frame == (_frame + 1) && qi.eyes == EYES_LEFT) {
343 Writer::LastWritten::update (QueueItem qi)
356 boost::mutex::scoped_lock lock (_state_mutex);
360 if (_finish || _queued_full_in_memory > _maximum_frames_in_memory || have_sequenced_image_at_queue_head ()) {
361 /* We've got something to do: go and do it */
365 /* Nothing to do: wait until something happens which may indicate that we do */
366 LOG_TIMING (N_("writer-sleep queue=%1"), _queue.size());
367 _empty_condition.wait (lock);
368 LOG_TIMING (N_("writer-wake queue=%1"), _queue.size());
371 if (_finish && _queue.empty()) {
375 /* We stop here if we have been asked to finish, and if either the queue
376 is empty or we do not have a sequenced image at its head (if this is the
377 case we will never terminate as no new frames will be sent once
380 if (_finish && (!have_sequenced_image_at_queue_head() || _queue.empty())) {
381 /* (Hopefully temporarily) log anything that was not written */
382 if (!_queue.empty() && !have_sequenced_image_at_queue_head()) {
383 LOG_WARNING (N_("Finishing writer with a left-over queue of %1:"), _queue.size());
384 BOOST_FOREACH (QueueItem const& i, _queue) {
385 if (i.type == QueueItem::FULL) {
386 LOG_WARNING (N_("- type FULL, frame %1, eyes %2"), i.frame, (int) i.eyes);
388 LOG_WARNING (N_("- type FAKE, size %1, frame %2, eyes %3"), i.size, i.frame, (int) i.eyes);
395 /* Write any frames that we can write; i.e. those that are in sequence. */
396 while (have_sequenced_image_at_queue_head ()) {
397 QueueItem qi = _queue.front ();
398 _last_written[qi.reel].update (qi);
400 if (qi.type == QueueItem::FULL && qi.encoded) {
401 --_queued_full_in_memory;
406 ReelWriter& reel = _reels[qi.reel];
409 case QueueItem::FULL:
410 LOG_DEBUG_ENCODE (N_("Writer FULL-writes %1 (%2)"), qi.frame, (int) qi.eyes);
412 qi.encoded = Data (_film->j2c_path (qi.reel, qi.frame, qi.eyes, false));
414 reel.write (qi.encoded, qi.frame, qi.eyes);
417 case QueueItem::FAKE:
418 LOG_DEBUG_ENCODE (N_("Writer FAKE-writes %1"), qi.frame);
419 reel.fake_write (qi.size);
422 case QueueItem::REPEAT:
423 LOG_DEBUG_ENCODE (N_("Writer REPEAT-writes %1"), qi.frame);
424 reel.repeat_write (qi.frame, qi.eyes);
430 _full_condition.notify_all ();
433 while (_queued_full_in_memory > _maximum_frames_in_memory) {
434 /* Too many frames in memory which can't yet be written to the stream.
435 Write some FULL frames to disk.
438 /* Find one from the back of the queue */
440 list<QueueItem>::reverse_iterator i = _queue.rbegin ();
441 while (i != _queue.rend() && (i->type != QueueItem::FULL || !i->encoded)) {
445 DCPOMATIC_ASSERT (i != _queue.rend());
447 /* For the log message below */
448 int const awaiting = _last_written[_queue.front().reel].frame() + 1;
451 /* i is valid here, even though we don't hold a lock on the mutex,
452 since list iterators are unaffected by insertion and only this
453 thread could erase the last item in the list.
456 LOG_GENERAL ("Writer full; pushes %1 to disk while awaiting %2", i->frame, awaiting);
458 i->encoded->write_via_temp (
459 _film->j2c_path (i->reel, i->frame, i->eyes, true),
460 _film->j2c_path (i->reel, i->frame, i->eyes, false)
465 --_queued_full_in_memory;
466 _full_condition.notify_all ();
476 Writer::terminate_thread (bool can_throw)
478 boost::mutex::scoped_lock lock (_state_mutex);
479 if (!_thread.joinable()) {
484 _empty_condition.notify_all ();
485 _full_condition.notify_all ();
498 if (!_thread.joinable()) {
502 LOG_GENERAL_NC ("Terminating writer thread");
504 terminate_thread (true);
506 LOG_GENERAL_NC ("Finishing ReelWriters");
508 BOOST_FOREACH (ReelWriter& i, _reels) {
512 LOG_GENERAL_NC ("Writing XML");
514 dcp::DCP dcp (_film->dir (_film->dcp_name()));
516 shared_ptr<dcp::CPL> cpl (
519 _film->dcp_content_type()->libdcp_kind ()
525 /* Calculate digests for each reel in parallel */
527 shared_ptr<Job> job = _job.lock ();
528 job->sub (_("Computing digests"));
530 boost::asio::io_service service;
531 boost::thread_group pool;
533 shared_ptr<boost::asio::io_service::work> work (new boost::asio::io_service::work (service));
535 int const threads = max (1, Config::instance()->master_encoding_threads ());
537 for (int i = 0; i < threads; ++i) {
538 pool.create_thread (boost::bind (&boost::asio::io_service::run, &service));
541 BOOST_FOREACH (ReelWriter& i, _reels) {
542 boost::function<void (float)> set_progress = boost::bind (&Writer::set_digest_progress, this, job.get(), _1);
543 service.post (boost::bind (&ReelWriter::calculate_digests, &i, set_progress));
550 /* Add reels to CPL */
552 BOOST_FOREACH (ReelWriter& i, _reels) {
553 cpl->add (i.create_reel (_reel_assets, _fonts));
556 dcp::XMLMetadata meta;
557 meta.annotation_text = cpl->annotation_text ();
558 meta.creator = Config::instance()->dcp_creator ();
559 if (meta.creator.empty ()) {
560 meta.creator = String::compose ("DCP-o-matic %1 %2", dcpomatic_version, dcpomatic_git_commit);
562 meta.issuer = Config::instance()->dcp_issuer ();
563 if (meta.issuer.empty ()) {
564 meta.issuer = String::compose ("DCP-o-matic %1 %2", dcpomatic_version, dcpomatic_git_commit);
566 meta.set_issue_date_now ();
568 cpl->set_metadata (meta);
569 cpl->set_ratings (vector_to_list(_film->ratings()));
570 cpl->set_content_version_label_text (_film->content_version());
572 shared_ptr<const dcp::CertificateChain> signer;
573 signer = Config::instance()->signer_chain ();
574 /* We did check earlier, but check again here to be on the safe side */
576 if (!signer->valid (&reason)) {
577 throw InvalidSignerError (reason);
580 dcp.write_xml (_film->interop () ? dcp::INTEROP : dcp::SMPTE, meta, signer, Config::instance()->dcp_metadata_filename_format());
583 N_("Wrote %1 FULL, %2 FAKE, %3 REPEAT, %4 pushed to disk"), _full_written, _fake_written, _repeat_written, _pushed_to_disk
586 write_cover_sheet ();
590 Writer::write_cover_sheet ()
592 boost::filesystem::path const cover = _film->file ("COVER_SHEET.txt");
593 FILE* f = fopen_boost (cover, "w");
595 throw OpenFileError (cover, errno, OpenFileError::WRITE);
598 string text = Config::instance()->cover_sheet ();
599 boost::algorithm::replace_all (text, "$CPL_NAME", _film->name());
600 boost::algorithm::replace_all (text, "$TYPE", _film->dcp_content_type()->pretty_name());
601 boost::algorithm::replace_all (text, "$CONTAINER", _film->container()->container_nickname());
602 boost::algorithm::replace_all (text, "$AUDIO_LANGUAGE", _film->isdcf_metadata().audio_language);
604 optional<string> subtitle_language;
605 BOOST_FOREACH (shared_ptr<Content> i, _film->content()) {
606 BOOST_FOREACH (shared_ptr<TextContent> j, i->text) {
607 if (j->type() == TEXT_OPEN_SUBTITLE && j->use()) {
608 subtitle_language = j->language ();
612 boost::algorithm::replace_all (text, "$SUBTITLE_LANGUAGE", subtitle_language.get_value_or("None"));
614 boost::uintmax_t size = 0;
616 boost::filesystem::recursive_directory_iterator i = boost::filesystem::recursive_directory_iterator(_film->dir(_film->dcp_name()));
617 i != boost::filesystem::recursive_directory_iterator();
619 if (boost::filesystem::is_regular_file (i->path ())) {
620 size += boost::filesystem::file_size (i->path ());
624 if (size > (1000000000L)) {
625 boost::algorithm::replace_all (text, "$SIZE", String::compose ("%1GB", dcp::locale_convert<string> (size / 1000000000.0, 1, true)));
627 boost::algorithm::replace_all (text, "$SIZE", String::compose ("%1MB", dcp::locale_convert<string> (size / 1000000.0, 1, true)));
630 pair<int, int> ch = audio_channel_types (_film->mapped_audio_channels(), _film->audio_channels());
631 string description = String::compose("%1.%2", ch.first, ch.second);
633 if (description == "0.0") {
634 description = _("None");
635 } else if (description == "1.0") {
636 description = _("Mono");
637 } else if (description == "2.0") {
638 description = _("Stereo");
640 boost::algorithm::replace_all (text, "$AUDIO", description);
643 _film->length().split (_film->video_frame_rate(), h, m, s, fr);
645 if (h == 0 && m == 0) {
646 length = String::compose("%1s", s);
647 } else if (h == 0 && m > 0) {
648 length = String::compose("%1m%2s", m, s);
649 } else if (h > 0 && m > 0) {
650 length = String::compose("%1h%2m%3s", h, m, s);
653 boost::algorithm::replace_all (text, "$LENGTH", length);
655 checked_fwrite (text.c_str(), text.length(), f, cover);
659 /** @param frame Frame index within the whole DCP.
660 * @return true if we can fake-write this frame.
663 Writer::can_fake_write (Frame frame) const
665 if (_film->encrypted()) {
666 /* We need to re-write the frame because the asset ID is embedded in the HMAC... I think... */
670 /* We have to do a proper write of the first frame so that we can set up the JPEG2000
671 parameters in the asset writer.
674 ReelWriter const & reel = _reels[video_reel(frame)];
676 /* Make frame relative to the start of the reel */
677 frame -= reel.start ();
678 return (frame != 0 && frame < reel.first_nonexistant_frame());
681 /** @param track Closed caption track if type == TEXT_CLOSED_CAPTION */
683 Writer::write (PlayerText text, TextType type, optional<DCPTextTrack> track, DCPTimePeriod period)
685 vector<ReelWriter>::iterator* reel = 0;
688 case TEXT_OPEN_SUBTITLE:
689 reel = &_subtitle_reel;
691 case TEXT_CLOSED_CAPTION:
692 DCPOMATIC_ASSERT (track);
693 DCPOMATIC_ASSERT (_caption_reels.find(*track) != _caption_reels.end());
694 reel = &_caption_reels[*track];
697 DCPOMATIC_ASSERT (false);
700 DCPOMATIC_ASSERT (*reel != _reels.end());
701 while ((*reel)->period().to <= period.from) {
703 DCPOMATIC_ASSERT (*reel != _reels.end());
706 (*reel)->write (text, type, track, period);
710 Writer::write (list<shared_ptr<Font> > fonts)
712 /* Just keep a list of unique fonts and we'll deal with them in ::finish */
714 BOOST_FOREACH (shared_ptr<Font> i, fonts) {
716 BOOST_FOREACH (shared_ptr<Font> j, _fonts) {
723 _fonts.push_back (i);
729 operator< (QueueItem const & a, QueueItem const & b)
731 if (a.reel != b.reel) {
732 return a.reel < b.reel;
735 if (a.frame != b.frame) {
736 return a.frame < b.frame;
739 return static_cast<int> (a.eyes) < static_cast<int> (b.eyes);
743 operator== (QueueItem const & a, QueueItem const & b)
745 return a.reel == b.reel && a.frame == b.frame && a.eyes == b.eyes;
749 Writer::set_encoder_threads (int threads)
751 boost::mutex::scoped_lock lm (_state_mutex);
752 _maximum_frames_in_memory = lrint (threads * Config::instance()->frames_in_memory_multiplier());
753 _maximum_queue_size = threads * 16;
757 Writer::write (ReferencedReelAsset asset)
759 _reel_assets.push_back (asset);
763 Writer::video_reel (int frame) const
765 DCPTime t = DCPTime::from_frames (frame, _film->video_frame_rate ());
767 while (i < _reels.size() && !_reels[i].period().contains (t)) {
771 DCPOMATIC_ASSERT (i < _reels.size ());
776 Writer::set_digest_progress (Job* job, float progress)
778 boost::mutex::scoped_lock lm (_digest_progresses_mutex);
780 _digest_progresses[boost::this_thread::get_id()] = progress;
781 float min_progress = FLT_MAX;
782 for (map<boost::thread::id, float>::const_iterator i = _digest_progresses.begin(); i != _digest_progresses.end(); ++i) {
783 min_progress = min (min_progress, i->second);
786 job->set_progress (min_progress);