Compare commits

...

4 Commits

  1. 282
      include/cache.h
  2. 27
      sources/COPERNICUS.cpp
  3. 14
      src/cache.cpp
  4. 8
      src/copcat.cpp

282
include/cache.h

@ -1,14 +1,149 @@
#pragma once #pragma once
#include "merrors.h" #include "GPL.h"
#include <functional>
#include <libpq-fe.h> #include <libpq-fe.h>
#include <sqlite3.h> #include <sqlite3.h>
#include <time.h> #include <time.h>
#include <variant> #include <variant>
using michlib::GPL;
using michlib::int_cast; using michlib::int_cast;
using michlib::MString; using michlib::MString;
using michlib::pointer_cast; using michlib::pointer_cast;
class SQLiteConnection
{
public:
using DBType = sqlite3*;
using FuncType = std::function<void(DBType)>;
private:
static DBType db;
static size_t count;
static std::vector<FuncType> destructs;
public:
SQLiteConnection()
{
count++;
if(db == nullptr)
{
MString oldprefix = GPL.UsePrefix("SQLITE");
MString name = GPL.ParameterSValue("db", "");
GPL.UsePrefix(oldprefix);
auto ret = sqlite3_open(name.Buf(), &db);
if(ret != SQLITE_OK)
{
sqlite3_close(db);
db = nullptr;
}
}
}
SQLiteConnection([[maybe_unused]] const SQLiteConnection& sq): SQLiteConnection() {}
SQLiteConnection(SQLiteConnection&&) = default;
SQLiteConnection& operator=([[maybe_unused]] const SQLiteConnection& sq)
{
*this = {};
return *this;
}
SQLiteConnection& operator=(SQLiteConnection&&) = default;
~SQLiteConnection()
{
if(count == 0) michlib::errmessage("Destructor of SQLiteConnection called on count==0");
if(count > 1)
count--;
else
{
count = 0;
if(db != nullptr)
{
for(const auto& f: destructs) f(db);
sqlite3_close(db);
}
db = nullptr;
}
}
static void AddDestructor(FuncType&& f) { destructs.emplace_back(std::move(f)); }
operator DBType() const { return db; }
static DBType GetDB() { return db; }
explicit operator bool() const { return db != nullptr; }
};
class PostgreSQLConnection
{
public:
using DBType = PGconn*;
using FuncType = std::function<void(DBType)>;
private:
static DBType conn;
static size_t count;
static std::vector<FuncType> destructs;
public:
PostgreSQLConnection()
{
count++;
if(conn == nullptr)
{
MString oldprefix = GPL.UsePrefix("POSTGRES");
MString name = GPL.ParameterSValue("connection", "");
GPL.UsePrefix(oldprefix);
conn = PQconnectdb(name.Buf());
if(PQstatus(conn) != CONNECTION_OK)
{
michlib::errmessage(PQerrorMessage(conn));
PQfinish(conn);
conn = nullptr;
}
}
}
PostgreSQLConnection([[maybe_unused]] const PostgreSQLConnection& pq): PostgreSQLConnection() {}
PostgreSQLConnection(PostgreSQLConnection&&) = default;
PostgreSQLConnection& operator=([[maybe_unused]] const PostgreSQLConnection& pq)
{
*this = {};
return *this;
}
PostgreSQLConnection& operator=(PostgreSQLConnection&&) = default;
~PostgreSQLConnection()
{
if(count == 0) michlib::errmessage("Destructor of PostgreSQLConnection called on count==0");
if(count > 1)
count--;
else
{
count = 0;
if(conn != nullptr)
{
for(const auto& f: destructs) f(conn);
PQfinish(conn);
}
conn = nullptr;
}
}
static void AddDestructor(FuncType&& f) { destructs.emplace_back(std::move(f)); }
operator DBType() const { return conn; }
static DBType GetDB() { return conn; }
explicit operator bool() const { return conn != nullptr; }
};
class GenericCache class GenericCache
{ {
public: public:
@ -27,44 +162,50 @@ class FakeCache: public GenericCache
class SQLiteCache: public GenericCache class SQLiteCache: public GenericCache
{ {
sqlite3* db = nullptr; static bool regdest;
SQLiteConnection db;
public: public:
bool Init(const MString& name) bool Init()
{ {
Close(); if(!db) return false;
auto ret = sqlite3_open(name.Buf(), &db);
if(ret != SQLITE_OK) if(!regdest)
{
Close();
return false;
}
// Create table
sqlite3_stmt* sqst;
int i;
i = sqlite3_prepare_v2(db,
"CREATE TABLE IF NOT EXISTS `cache`('key' TEXT PRIMARY KEY ON CONFLICT REPLACE NOT NULL ON CONFLICT FAIL, 'value' BLOB NOT NULL ON CONFLICT FAIL, "
"'exptime' INTEGER NOT NULL ON CONFLICT FAIL) WITHOUT ROWID, STRICT;",
-1, &sqst, 0);
i = sqlite3_step(sqst);
if(i != SQLITE_DONE)
{ {
// Create table
sqlite3_stmt* sqst;
int i;
i = sqlite3_prepare_v2(db,
"CREATE TABLE IF NOT EXISTS `cache`('key' TEXT PRIMARY KEY ON CONFLICT REPLACE NOT NULL ON CONFLICT FAIL, 'value' BLOB NOT NULL ON CONFLICT FAIL, "
"'exptime' INTEGER NOT NULL ON CONFLICT FAIL) WITHOUT ROWID, STRICT;",
-1, &sqst, 0);
i = sqlite3_step(sqst);
if(i != SQLITE_DONE)
{
sqlite3_finalize(sqst);
return false;
}
sqlite3_finalize(sqst); sqlite3_finalize(sqst);
Close(); sqlite3_busy_timeout(db, 1000);
return false;
db.AddDestructor(
[](SQLiteConnection::DBType db)
{
sqlite3_stmt* sqst = nullptr;
int i = SQLITE_OK;
if(i == SQLITE_OK) i = sqlite3_prepare_v2(db, "DELETE from `cache` WHERE exptime<?1;", -1, &sqst, 0);
if(i == SQLITE_OK) i = sqlite3_bind_int64(sqst, 1, time(nullptr));
if(i == SQLITE_OK) i = sqlite3_step(sqst);
sqlite3_finalize(sqst);
});
regdest = true;
} }
sqlite3_finalize(sqst);
sqlite3_busy_timeout(db, 1000);
return true; return true;
} }
void Close()
{
if(db != nullptr) sqlite3_close(db);
db = nullptr;
}
virtual bool Put(const MString& key, const MString& value, size_t ttl) const override virtual bool Put(const MString& key, const MString& value, size_t ttl) const override
{ {
if(!*this) return false; if(!*this) return false;
@ -107,26 +248,16 @@ class SQLiteCache: public GenericCache
return {"", false}; return {"", false};
} }
virtual ~SQLiteCache() override virtual ~SQLiteCache() override = default;
{
if(!*this) return;
sqlite3_stmt* sqst = nullptr;
int i = SQLITE_OK;
if(i == SQLITE_OK) i = sqlite3_prepare_v2(db, "DELETE from `cache` WHERE exptime<?1;", -1, &sqst, 0);
if(i == SQLITE_OK) i = sqlite3_bind_int64(sqst, 1, time(nullptr));
if(i == SQLITE_OK) i = sqlite3_step(sqst);
sqlite3_finalize(sqst);
sqlite3_close_v2(db);
}
explicit operator bool() const { return db != nullptr; } explicit operator bool() const { return db != nullptr; }
}; };
class PostgreSQLCache: public GenericCache class PostgreSQLCache: public GenericCache
{ {
PGconn* conn = nullptr; static bool regdest;
PostgreSQLConnection conn;
bool CheckCon() const bool CheckCon() const
{ {
@ -148,41 +279,43 @@ class PostgreSQLCache: public GenericCache
} }
public: public:
bool Init(const MString& name) bool Init()
{ {
Close(); if(!conn) return false;
conn = PQconnectdb(name.Buf());
if(PQstatus(conn) != CONNECTION_OK) if(!regdest)
{ {
michlib::errmessage(PQerrorMessage(conn)); // Create table
Close();
return false;
}
// Create table
if(false)
{
auto* res = PQexec(conn, "CREATE TABLE IF NOT EXISTS cache(key TEXT PRIMARY KEY NOT NULL, value BYTEA, exptime TIMESTAMP(0) NOT NULL);");
if(PQresultStatus(res) != PGRES_COMMAND_OK)
{ {
michlib::errmessage(PQresStatus(PQresultStatus(res))); auto* res = PQexec(conn, "SET client_min_messages=WARNING;");
michlib::errmessage(PQerrorMessage(conn));
PQclear(res); PQclear(res);
Close();
} }
else {
auto* res = PQexec(conn, "CREATE TABLE IF NOT EXISTS cache(key TEXT PRIMARY KEY NOT NULL, value BYTEA, exptime TIMESTAMP(0) NOT NULL);");
if(PQresultStatus(res) != PGRES_COMMAND_OK)
{
michlib::errmessage(PQresStatus(PQresultStatus(res)));
michlib::errmessage(PQerrorMessage(conn));
}
PQclear(res); PQclear(res);
}
{
auto* res = PQexec(conn, "SET client_min_messages=NOTICE;");
PQclear(res);
}
conn.AddDestructor(
[](PostgreSQLConnection::DBType conn)
{
auto* res = PQexec(conn, "DELETE FROM cache WHERE exptime<localtimestamp;");
PQclear(res);
});
regdest = true;
} }
return true; return true;
} }
void Close()
{
if(conn != nullptr) PQfinish(conn);
conn = nullptr;
}
virtual bool Put(const MString& key, const MString& value, size_t ttl) const override virtual bool Put(const MString& key, const MString& value, size_t ttl) const override
{ {
if(!CheckCon()) return false; if(!CheckCon()) return false;
@ -235,14 +368,7 @@ class PostgreSQLCache: public GenericCache
return {std::move(val), true}; return {std::move(val), true};
} }
virtual ~PostgreSQLCache() override virtual ~PostgreSQLCache() override = default;
{
if(!CheckCon()) return;
auto* res = PQexec(conn, "DELETE FROM cache WHERE exptime<localtimestamp;");
PQclear(res);
Close();
}
explicit operator bool() const { return conn != nullptr; } explicit operator bool() const { return conn != nullptr; }
}; };
@ -262,14 +388,14 @@ inline GenericCache* CreateCache(const MString& cachedesc)
if(name == "sqlite") if(name == "sqlite")
{ {
auto ret = new SQLiteCache; auto ret = new SQLiteCache;
ret->Init(par); ret->Init();
if(*ret) return ret; if(*ret) return ret;
delete ret; delete ret;
} }
if(name == "postgre" || name == "postgres" || name == "postgresql") if(name == "postgre" || name == "postgres" || name == "postgresql")
{ {
auto ret = new PostgreSQLCache; auto ret = new PostgreSQLCache;
ret->Init(par); ret->Init();
if(*ret) return ret; if(*ret) return ret;
delete ret; delete ret;
} }

27
sources/COPERNICUS.cpp

@ -129,6 +129,9 @@ Error COPERNICUSData::Mirror(const CLArgs& args) const
dsets = dlist.Value(); dsets = dlist.Value();
} }
michlib::RegExpSimple filter((args.contains("filter") ? args.at("filter") : ".*").Buf());
if(filter.Compile() != 0) return Error(pref, MString("Can't compile regular expression ") + filter.RegStr());
CURLRAII chandle; CURLRAII chandle;
for(const auto& dset: dsets) for(const auto& dset: dsets)
{ {
@ -154,20 +157,34 @@ Error COPERNICUSData::Mirror(const CLArgs& args) const
while(rpos != rfiles.size() || lpos != lfiles.size()) while(rpos != rfiles.size() || lpos != lfiles.size())
{ {
if(rpos == rfiles.size()) if(rpos == rfiles.size())
while(lpos != lfiles.size()) rem.push_back(lpos++); while(lpos != lfiles.size())
{
if(filter.Match(lfiles[lpos].name.Buf())) rem.push_back(lpos);
lpos++;
}
if(lpos == lfiles.size()) if(lpos == lfiles.size())
while(rpos != rfiles.size()) down.push_back(rpos++); while(rpos != rfiles.size())
{
if(filter.Match(rfiles[rpos].name.Buf())) down.push_back(rpos);
rpos++;
}
if(rpos == rfiles.size() || lpos == lfiles.size()) continue; if(rpos == rfiles.size() || lpos == lfiles.size()) continue;
if(rfiles[rpos].name < lfiles[lpos].name) if(rfiles[rpos].name < lfiles[lpos].name)
down.push_back(rpos++); {
if(filter.Match(rfiles[rpos].name.Buf())) down.push_back(rpos);
rpos++;
}
else if(lfiles[lpos].name < rfiles[rpos].name) else if(lfiles[lpos].name < rfiles[rpos].name)
rem.push_back(lpos++); {
if(filter.Match(lfiles[lpos].name.Buf())) rem.push_back(lpos);
lpos++;
}
else else
{ {
auto delta = rfiles[rpos].mtime.Epoch() - lfiles[lpos].mtime.Epoch(); auto delta = rfiles[rpos].mtime.Epoch() - lfiles[lpos].mtime.Epoch();
if(delta < 0) delta = -delta; if(delta < 0) delta = -delta;
if(delta > 0 || rfiles[rpos].size != lfiles[lpos].size) upd.emplace_back(rpos, lpos); if((delta > 0 || rfiles[rpos].size != lfiles[lpos].size) && filter.Match(lfiles[lpos].name.Buf())) upd.emplace_back(rpos, lpos);
lpos++; lpos++;
rpos++; rpos++;
} }

14
src/cache.cpp

@ -0,0 +1,14 @@
#define MICHLIB_NOSOURCE
#include "cache.h"
SQLiteConnection::DBType SQLiteConnection::db = nullptr;
size_t SQLiteConnection::count = 0;
std::vector<SQLiteConnection::FuncType> SQLiteConnection::destructs = {};
PostgreSQLConnection::DBType PostgreSQLConnection::conn = nullptr;
size_t PostgreSQLConnection::count = 0;
std::vector<PostgreSQLConnection::FuncType> PostgreSQLConnection::destructs = {};
bool SQLiteCache::regdest = false;
bool PostgreSQLCache::regdest = false;

8
src/copcat.cpp

@ -7,16 +7,20 @@ const MString CopernicusCatalog::caturl = "https://stac.marine.copernicus.eu/met
CopernicusCatalog::CopernicusCatalog() CopernicusCatalog::CopernicusCatalog()
{ {
// Cache
auto oldprefix = michlib::GPL.UsePrefix("COPERNICUS"); auto oldprefix = michlib::GPL.UsePrefix("COPERNICUS");
// Cache
cache.reset(CreateCache(michlib::GPL.ParameterSValue("Cache", ""))); cache.reset(CreateCache(michlib::GPL.ParameterSValue("Cache", "")));
michlib::GPL.UsePrefix(oldprefix);
if(!cache) if(!cache)
{ {
michlib::errmessage("Can't init cache"); michlib::errmessage("Can't init cache");
cache.reset(new FakeCache); cache.reset(new FakeCache);
} }
// Proxy
auto proxyurl = michlib::GPL.ParameterSValue("Proxy", "");
if(proxyurl.Exist()) curl_easy_setopt(chandle, CURLOPT_PROXY, proxyurl.Buf());
michlib::GPL.UsePrefix(oldprefix);
GetCatalog(); GetCatalog();
} }

Loading…
Cancel
Save