cfc19a13dd041adb38307489eaf97ef1a1036e71
[dcpomatic.git] / src / lib / writer.cc
1 /*
2     Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 #include <fstream>
21 #include <cerrno>
22 #include <libdcp/mono_picture_asset.h>
23 #include <libdcp/stereo_picture_asset.h>
24 #include <libdcp/sound_asset.h>
25 #include <libdcp/reel.h>
26 #include <libdcp/dcp.h>
27 #include <libdcp/cpl.h>
28 #include "writer.h"
29 #include "compose.hpp"
30 #include "film.h"
31 #include "ratio.h"
32 #include "log.h"
33 #include "dcp_video_frame.h"
34 #include "dcp_content_type.h"
35 #include "player.h"
36 #include "audio_mapping.h"
37 #include "config.h"
38 #include "job.h"
39 #include "cross.h"
40
41 #include "i18n.h"
42
43 using std::make_pair;
44 using std::pair;
45 using std::string;
46 using std::list;
47 using std::cout;
48 using std::stringstream;
49 using boost::shared_ptr;
50 using boost::weak_ptr;
51
52 int const Writer::_maximum_frames_in_memory = Config::instance()->num_local_encoding_threads() + 4;
53
54 Writer::Writer (shared_ptr<const Film> f, weak_ptr<Job> j)
55         : _film (f)
56         , _job (j)
57         , _first_nonexistant_frame (0)
58         , _thread (0)
59         , _finish (false)
60         , _queued_full_in_memory (0)
61         , _last_written_frame (-1)
62         , _last_written_eyes (EYES_RIGHT)
63         , _full_written (0)
64         , _fake_written (0)
65         , _repeat_written (0)
66         , _pushed_to_disk (0)
67 {
68         /* Remove any old DCP */
69         boost::filesystem::remove_all (_film->dir (_film->dcp_name ()));
70
71         shared_ptr<Job> job = _job.lock ();
72         assert (job);
73
74         job->sub (_("Checking existing image data"));
75         check_existing_picture_mxf ();
76
77         /* Create our picture asset in a subdirectory, named according to those
78            film's parameters which affect the video output.  We will hard-link
79            it into the DCP later.
80         */
81
82         if (_film->three_d ()) {
83                 _picture_asset.reset (new libdcp::StereoPictureAsset (_film->internal_video_mxf_dir (), _film->internal_video_mxf_filename ()));
84         } else {
85                 _picture_asset.reset (new libdcp::MonoPictureAsset (_film->internal_video_mxf_dir (), _film->internal_video_mxf_filename ()));
86         }
87
88         _picture_asset->set_edit_rate (_film->video_frame_rate ());
89         _picture_asset->set_size (fit_ratio_within (_film->container()->ratio(), _film->full_frame ()));
90
91         if (_film->encrypted ()) {
92                 _picture_asset->set_key (_film->key ());
93         }
94         
95         _picture_asset_writer = _picture_asset->start_write (_first_nonexistant_frame > 0);
96
97         _sound_asset.reset (new libdcp::SoundAsset (_film->directory (), _film->audio_mxf_filename ()));
98         _sound_asset->set_edit_rate (_film->video_frame_rate ());
99         _sound_asset->set_channels (_film->audio_channels ());
100         _sound_asset->set_sampling_rate (_film->audio_frame_rate ());
101
102         if (_film->encrypted ()) {
103                 _sound_asset->set_key (_film->key ());
104         }
105         
106         /* Write the sound asset into the film directory so that we leave the creation
107            of the DCP directory until the last minute.  Some versions of windows inexplicably
108            don't like overwriting existing files here, so try to remove it using boost.
109         */
110         boost::system::error_code ec;
111         boost::filesystem::remove_all (_film->file (_film->audio_mxf_filename ()), ec);
112         if (ec) {
113
114                 stringstream s;
115                 boost::filesystem::path p = _film->file (_film->audio_mxf_filename ());
116                 s << p << "\n"
117                   << "exists=" << boost::filesystem::exists (p) << "\n"
118                   << "file_size=" << boost::filesystem::file_size (p) << "\n"
119                   << "hard_link_count=" << boost::filesystem::hard_link_count (p) << "\n"
120                   << "is_directory=" << boost::filesystem::is_directory (p) << "\n"
121                   << "is_empty=" << boost::filesystem::is_empty (p) << "\n"
122                   << "is_other=" << boost::filesystem::is_other (p) << "\n"
123                   << "is_regular_file=" << boost::filesystem::is_regular_file (p) << "\n"
124                   << "last_write_time=" << boost::filesystem::last_write_time (p) << "\n"
125                   << "type=" << boost::filesystem::status (p).type () << "\n"
126                   << "permissions=" << boost::filesystem::status (p).permissions () << "\n";
127
128                 _film->log()->log (s.str ());
129                 
130                 _film->log()->log (
131                         String::compose (
132                                 "Could not remove existing audio MXF file %1 (%2)",
133                                 _film->file (_film->audio_mxf_filename ()),
134                                 ec.value ()
135                                 )
136                         );
137         }
138
139         _sound_asset_writer = _sound_asset->start_write ();
140
141         _thread = new boost::thread (boost::bind (&Writer::thread, this));
142
143         job->sub (_("Encoding image data"));
144 }
145
146 void
147 Writer::write (shared_ptr<const EncodedData> encoded, int frame, Eyes eyes)
148 {
149         boost::mutex::scoped_lock lock (_mutex);
150
151         while (_queued_full_in_memory > _maximum_frames_in_memory) {
152                 _full_condition.wait (lock);
153         }
154
155         QueueItem qi;
156         qi.type = QueueItem::FULL;
157         qi.encoded = encoded;
158         qi.frame = frame;
159
160         if (_film->three_d() && eyes == EYES_BOTH) {
161                 /* 2D material in a 3D DCP; fake the 3D */
162                 qi.eyes = EYES_LEFT;
163                 _queue.push_back (qi);
164                 ++_queued_full_in_memory;
165                 qi.eyes = EYES_RIGHT;
166                 _queue.push_back (qi);
167                 ++_queued_full_in_memory;
168         } else {
169                 qi.eyes = eyes;
170                 _queue.push_back (qi);
171                 ++_queued_full_in_memory;
172         }
173         
174         _empty_condition.notify_all ();
175 }
176
177 void
178 Writer::fake_write (int frame, Eyes eyes)
179 {
180         boost::mutex::scoped_lock lock (_mutex);
181
182         while (_queued_full_in_memory > _maximum_frames_in_memory) {
183                 _full_condition.wait (lock);
184         }
185         
186         FILE* ifi = fopen_boost (_film->info_path (frame, eyes), "r");
187         libdcp::FrameInfo info (ifi);
188         fclose (ifi);
189         
190         QueueItem qi;
191         qi.type = QueueItem::FAKE;
192         qi.size = info.size;
193         qi.frame = frame;
194         if (_film->three_d() && eyes == EYES_BOTH) {
195                 qi.eyes = EYES_LEFT;
196                 _queue.push_back (qi);
197                 qi.eyes = EYES_RIGHT;
198                 _queue.push_back (qi);
199         } else {
200                 qi.eyes = eyes;
201                 _queue.push_back (qi);
202         }
203
204         _empty_condition.notify_all ();
205 }
206
207 /** This method is not thread safe */
208 void
209 Writer::write (shared_ptr<const AudioBuffers> audio)
210 {
211         _sound_asset_writer->write (audio->data(), audio->frames());
212 }
213
214 /** This must be called from Writer::thread() with an appropriate lock held */
215 bool
216 Writer::have_sequenced_image_at_queue_head ()
217 {
218         if (_queue.empty ()) {
219                 return false;
220         }
221
222         _queue.sort ();
223
224         /* The queue should contain only EYES_LEFT/EYES_RIGHT pairs or EYES_BOTH */
225
226         if (_queue.front().eyes == EYES_BOTH) {
227                 /* 2D */
228                 return _queue.front().frame == (_last_written_frame + 1);
229         }
230
231         /* 3D */
232
233         if (_last_written_eyes == EYES_LEFT && _queue.front().frame == _last_written_frame && _queue.front().eyes == EYES_RIGHT) {
234                 return true;
235         }
236
237         if (_last_written_eyes == EYES_RIGHT && _queue.front().frame == (_last_written_frame + 1) && _queue.front().eyes == EYES_LEFT) {
238                 return true;
239         }
240
241         return false;
242 }
243
244 void
245 Writer::thread ()
246 try
247 {
248         while (1)
249         {
250                 boost::mutex::scoped_lock lock (_mutex);
251
252                 while (1) {
253                         
254                         if (_finish || _queued_full_in_memory > _maximum_frames_in_memory || have_sequenced_image_at_queue_head ()) {
255                                 break;
256                         }
257
258                         TIMING (N_("writer sleeps with a queue of %1"), _queue.size());
259                         _empty_condition.wait (lock);
260                         TIMING (N_("writer wakes with a queue of %1"), _queue.size());
261                 }
262
263                 if (_finish && _queue.empty()) {
264                         return;
265                 }
266
267                 /* Write any frames that we can write; i.e. those that are in sequence. */
268                 while (have_sequenced_image_at_queue_head ()) {
269                         QueueItem qi = _queue.front ();
270                         _queue.pop_front ();
271                         if (qi.type == QueueItem::FULL && qi.encoded) {
272                                 --_queued_full_in_memory;
273                         }
274
275                         lock.unlock ();
276                         switch (qi.type) {
277                         case QueueItem::FULL:
278                         {
279                                 _film->log()->log (String::compose (N_("Writer FULL-writes %1 to MXF"), qi.frame));
280                                 if (!qi.encoded) {
281                                         qi.encoded.reset (new EncodedData (_film->j2c_path (qi.frame, qi.eyes, false)));
282                                 }
283
284                                 libdcp::FrameInfo fin = _picture_asset_writer->write (qi.encoded->data(), qi.encoded->size());
285                                 qi.encoded->write_info (_film, qi.frame, qi.eyes, fin);
286                                 _last_written[qi.eyes] = qi.encoded;
287                                 ++_full_written;
288                                 break;
289                         }
290                         case QueueItem::FAKE:
291                                 _film->log()->log (String::compose (N_("Writer FAKE-writes %1 to MXF"), qi.frame));
292                                 _picture_asset_writer->fake_write (qi.size);
293                                 _last_written[qi.eyes].reset ();
294                                 ++_fake_written;
295                                 break;
296                         case QueueItem::REPEAT:
297                         {
298                                 _film->log()->log (String::compose (N_("Writer REPEAT-writes %1 to MXF"), qi.frame));
299                                 libdcp::FrameInfo fin = _picture_asset_writer->write (
300                                         _last_written[qi.eyes]->data(),
301                                         _last_written[qi.eyes]->size()
302                                         );
303                                 
304                                 _last_written[qi.eyes]->write_info (_film, qi.frame, qi.eyes, fin);
305                                 ++_repeat_written;
306                                 break;
307                         }
308                         }
309                         lock.lock ();
310
311                         _last_written_frame = qi.frame;
312                         _last_written_eyes = qi.eyes;
313                         
314                         if (_film->length()) {
315                                 shared_ptr<Job> job = _job.lock ();
316                                 assert (job);
317                                 int total = _film->time_to_video_frames (_film->length ());
318                                 if (_film->three_d ()) {
319                                         /* _full_written and so on are incremented for each eye, so we need to double the total
320                                            frames to get the correct progress.
321                                         */
322                                         total *= 2;
323                                 }
324                                 job->set_progress (float (_full_written + _fake_written + _repeat_written) / total);
325                         }
326                 }
327
328                 while (_queued_full_in_memory > _maximum_frames_in_memory) {
329                         /* Too many frames in memory which can't yet be written to the stream.
330                            Write some FULL frames to disk.
331                         */
332
333                         /* Find one from the back of the queue */
334                         _queue.sort ();
335                         list<QueueItem>::reverse_iterator i = _queue.rbegin ();
336                         while (i != _queue.rend() && (i->type != QueueItem::FULL || !i->encoded)) {
337                                 ++i;
338                         }
339
340                         assert (i != _queue.rend());
341                         QueueItem qi = *i;
342
343                         ++_pushed_to_disk;
344                         
345                         lock.unlock ();
346
347                         _film->log()->log (
348                                 String::compose (
349                                         "Writer full (awaiting %1 [last eye was %2]); pushes %3 to disk",
350                                         _last_written_frame + 1,
351                                         _last_written_eyes, qi.frame)
352                                 );
353                         
354                         qi.encoded->write (_film, qi.frame, qi.eyes);
355                         lock.lock ();
356                         qi.encoded.reset ();
357                         --_queued_full_in_memory;
358                 }
359
360                 _full_condition.notify_all ();
361         }
362 }
363 catch (...)
364 {
365         store_current ();
366 }
367
368 void
369 Writer::finish ()
370 {
371         if (!_thread) {
372                 return;
373         }
374         
375         boost::mutex::scoped_lock lock (_mutex);
376         _finish = true;
377         _empty_condition.notify_all ();
378         _full_condition.notify_all ();
379         lock.unlock ();
380
381         _thread->join ();
382         rethrow ();
383         
384         delete _thread;
385         _thread = 0;
386
387         _picture_asset_writer->finalize ();
388         _sound_asset_writer->finalize ();
389         
390         int const frames = _last_written_frame + 1;
391
392         _picture_asset->set_duration (frames);
393
394         /* Hard-link the video MXF into the DCP */
395         boost::filesystem::path video_from;
396         video_from /= _film->internal_video_mxf_dir();
397         video_from /= _film->internal_video_mxf_filename();
398         
399         boost::filesystem::path video_to;
400         video_to /= _film->dir (_film->dcp_name());
401         video_to /= _film->video_mxf_filename ();
402
403         boost::system::error_code ec;
404         boost::filesystem::create_hard_link (video_from, video_to, ec);
405         if (ec) {
406                 /* hard link failed; copy instead */
407                 boost::filesystem::copy_file (video_from, video_to);
408                 _film->log()->log ("Hard-link failed; fell back to copying");
409         }
410
411         /* And update the asset */
412
413         _picture_asset->set_directory (_film->dir (_film->dcp_name ()));
414         _picture_asset->set_file_name (_film->video_mxf_filename ());
415
416         /* Move the audio MXF into the DCP */
417
418         boost::filesystem::path audio_to;
419         audio_to /= _film->dir (_film->dcp_name ());
420         audio_to /= _film->audio_mxf_filename ();
421         
422         boost::filesystem::rename (_film->file (_film->audio_mxf_filename ()), audio_to, ec);
423         if (ec) {
424                 throw FileError (
425                         String::compose (_("could not move audio MXF into the DCP (%1)"), ec.value ()), _film->file (_film->audio_mxf_filename ())
426                         );
427         }
428
429         _sound_asset->set_directory (_film->dir (_film->dcp_name ()));
430         _sound_asset->set_duration (frames);
431         
432         libdcp::DCP dcp (_film->dir (_film->dcp_name()));
433
434         shared_ptr<libdcp::CPL> cpl (
435                 new libdcp::CPL (
436                         _film->dir (_film->dcp_name()),
437                         _film->dcp_name(),
438                         _film->dcp_content_type()->libdcp_kind (),
439                         frames,
440                         _film->video_frame_rate ()
441                         )
442                 );
443         
444         dcp.add_cpl (cpl);
445
446         cpl->add_reel (shared_ptr<libdcp::Reel> (new libdcp::Reel (
447                                                          _picture_asset,
448                                                          _sound_asset,
449                                                          shared_ptr<libdcp::SubtitleAsset> ()
450                                                          )
451                                ));
452
453         shared_ptr<Job> job = _job.lock ();
454         assert (job);
455
456         job->sub (_("Computing image digest"));
457         _picture_asset->compute_digest (boost::bind (&Job::set_progress, job.get(), _1, false));
458
459         job->sub (_("Computing audio digest"));
460         _sound_asset->compute_digest (boost::bind (&Job::set_progress, job.get(), _1, false));
461
462         libdcp::XMLMetadata meta = Config::instance()->dcp_metadata ();
463         meta.set_issue_date_now ();
464         dcp.write_xml (_film->interop (), meta, _film->is_signed() ? make_signer () : shared_ptr<const libdcp::Signer> ());
465
466         _film->log()->log (
467                 String::compose (N_("Wrote %1 FULL, %2 FAKE, %3 REPEAT; %4 pushed to disk"), _full_written, _fake_written, _repeat_written, _pushed_to_disk)
468                 );
469 }
470
471 /** Tell the writer that frame `f' should be a repeat of the frame before it */
472 void
473 Writer::repeat (int f, Eyes e)
474 {
475         boost::mutex::scoped_lock lock (_mutex);
476
477         while (_queued_full_in_memory > _maximum_frames_in_memory) {
478                 _full_condition.wait (lock);
479         }
480         
481         QueueItem qi;
482         qi.type = QueueItem::REPEAT;
483         qi.frame = f;
484         if (_film->three_d() && e == EYES_BOTH) {
485                 qi.eyes = EYES_LEFT;
486                 _queue.push_back (qi);
487                 qi.eyes = EYES_RIGHT;
488                 _queue.push_back (qi);
489         } else {
490                 qi.eyes = e;
491                 _queue.push_back (qi);
492         }
493
494         _empty_condition.notify_all ();
495 }
496
497 bool
498 Writer::check_existing_picture_mxf_frame (FILE* mxf, int f, Eyes eyes)
499 {
500         /* Read the frame info as written */
501         FILE* ifi = fopen_boost (_film->info_path (f, eyes), "r");
502         if (!ifi) {
503                 _film->log()->log (String::compose ("Existing frame %1 has no info file", f));
504                 return false;
505         }
506         
507         libdcp::FrameInfo info (ifi);
508         fclose (ifi);
509         if (info.size == 0) {
510                 _film->log()->log (String::compose ("Existing frame %1 has no info file", f));
511                 return false;
512         }
513         
514         /* Read the data from the MXF and hash it */
515         dcpomatic_fseek (mxf, info.offset, SEEK_SET);
516         EncodedData data (info.size);
517         size_t const read = fread (data.data(), 1, data.size(), mxf);
518         if (read != static_cast<size_t> (data.size ())) {
519                 _film->log()->log (String::compose ("Existing frame %1 is incomplete", f));
520                 return false;
521         }
522         
523         string const existing_hash = md5_digest (data.data(), data.size());
524         if (existing_hash != info.hash) {
525                 _film->log()->log (String::compose ("Existing frame %1 failed hash check", f));
526                 return false;
527         }
528
529         return true;
530 }
531
532 void
533 Writer::check_existing_picture_mxf ()
534 {
535         /* Try to open the existing MXF */
536         boost::filesystem::path p;
537         p /= _film->internal_video_mxf_dir ();
538         p /= _film->internal_video_mxf_filename ();
539         FILE* mxf = fopen_boost (p, "rb");
540         if (!mxf) {
541                 _film->log()->log (String::compose ("Could not open existing MXF at %1 (errno=%2)", p.string(), errno));
542                 return;
543         }
544
545         int N = 0;
546         for (boost::filesystem::directory_iterator i (_film->info_dir ()); i != boost::filesystem::directory_iterator (); ++i) {
547                 ++N;
548         }
549
550         while (1) {
551
552                 shared_ptr<Job> job = _job.lock ();
553                 assert (job);
554
555                 job->set_progress (float (_first_nonexistant_frame) / N);
556
557                 if (_film->three_d ()) {
558                         if (!check_existing_picture_mxf_frame (mxf, _first_nonexistant_frame, EYES_LEFT)) {
559                                 break;
560                         }
561                         if (!check_existing_picture_mxf_frame (mxf, _first_nonexistant_frame, EYES_RIGHT)) {
562                                 break;
563                         }
564                 } else {
565                         if (!check_existing_picture_mxf_frame (mxf, _first_nonexistant_frame, EYES_BOTH)) {
566                                 break;
567                         }
568                 }
569
570                 _film->log()->log (String::compose ("Have existing frame %1", _first_nonexistant_frame));
571                 ++_first_nonexistant_frame;
572         }
573
574         fclose (mxf);
575 }
576
577 /** @param frame Frame index.
578  *  @return true if we can fake-write this frame.
579  */
580 bool
581 Writer::can_fake_write (int frame) const
582 {
583         /* We have to do a proper write of the first frame so that we can set up the JPEG2000
584            parameters in the MXF writer.
585         */
586         return (frame != 0 && frame < _first_nonexistant_frame);
587 }
588
589 bool
590 operator< (QueueItem const & a, QueueItem const & b)
591 {
592         if (a.frame != b.frame) {
593                 return a.frame < b.frame;
594         }
595
596         return static_cast<int> (a.eyes) < static_cast<int> (b.eyes);
597 }
598
599 bool
600 operator== (QueueItem const & a, QueueItem const & b)
601 {
602         return a.frame == b.frame && a.eyes == b.eyes;
603 }