summaryrefslogtreecommitdiffstats
path: root/updater/blockimg.cpp
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--updater/blockimg.cpp30
1 files changed, 26 insertions, 4 deletions
diff --git a/updater/blockimg.cpp b/updater/blockimg.cpp
index 0f08d17eb..d5c170473 100644
--- a/updater/blockimg.cpp
+++ b/updater/blockimg.cpp
@@ -149,7 +149,7 @@ static void allocate(size_t size, std::vector<uint8_t>& buffer) {
class RangeSinkWriter {
public:
RangeSinkWriter(int fd, const RangeSet& tgt)
- : fd_(fd), tgt_(tgt), next_range_(0), current_range_left_(0) {
+ : fd_(fd), tgt_(tgt), next_range_(0), current_range_left_(0), bytes_written_(0) {
CHECK_NE(tgt.size(), static_cast<size_t>(0));
};
@@ -201,9 +201,14 @@ class RangeSinkWriter {
written += write_now;
}
+ bytes_written_ += written;
return written;
}
+ size_t BytesWritten() const {
+ return bytes_written_;
+ }
+
private:
// The input data.
int fd_;
@@ -213,6 +218,8 @@ class RangeSinkWriter {
size_t next_range_;
// The number of bytes to write before moving to the next range.
size_t current_range_left_;
+ // Total bytes written by the writer.
+ size_t bytes_written_;
};
/**
@@ -238,6 +245,7 @@ struct NewThreadInfo {
ZipEntry entry;
RangeSinkWriter* writer;
+ bool receiver_available;
pthread_mutex_t mu;
pthread_cond_t cv;
@@ -274,9 +282,16 @@ static bool receive_new_data(const uint8_t* data, size_t size, void* cookie) {
}
static void* unzip_new_data(void* cookie) {
- NewThreadInfo* nti = static_cast<NewThreadInfo*>(cookie);
- ProcessZipEntryContents(nti->za, &nti->entry, receive_new_data, nti);
- return nullptr;
+ NewThreadInfo* nti = static_cast<NewThreadInfo*>(cookie);
+ ProcessZipEntryContents(nti->za, &nti->entry, receive_new_data, nti);
+
+ pthread_mutex_lock(&nti->mu);
+ nti->receiver_available = false;
+ if (nti->writer != nullptr) {
+ pthread_cond_broadcast(&nti->cv);
+ }
+ pthread_mutex_unlock(&nti->mu);
+ return nullptr;
}
static int ReadBlocks(const RangeSet& src, std::vector<uint8_t>& buffer, int fd) {
@@ -1133,6 +1148,12 @@ static int PerformCommandNew(CommandParameters& params) {
pthread_cond_broadcast(&params.nti.cv);
while (params.nti.writer != nullptr) {
+ if (!params.nti.receiver_available) {
+ LOG(ERROR) << "missing " << (tgt.blocks() * BLOCKSIZE - params.nti.writer->BytesWritten())
+ << " bytes of new data";
+ pthread_mutex_unlock(&params.nti.mu);
+ return -1;
+ }
pthread_cond_wait(&params.nti.cv, &params.nti.mu);
}
@@ -1361,6 +1382,7 @@ static Value* PerformBlockImageUpdate(const char* name, State* state,
if (params.canwrite) {
params.nti.za = za;
params.nti.entry = new_entry;
+ params.nti.receiver_available = true;
pthread_mutex_init(&params.nti.mu, nullptr);
pthread_cond_init(&params.nti.cv, nullptr);